从前面几小节的学习,我们可能知道在发送和接收消息重要的类ConnectionFactory, Connection,Channel和QueueingConsumer。
ConntectionFactory类是方便创建与AMQP代理相关联的Connection;下面来看看ConntectionFactory是如何创建一个Contention.
首先通过new ConnectionFactory()创建一个ConnectionFactory;并设置此连接工厂的主机设置为localhost。通过ConnectionFactory的newConnection()方法 创建一个Connection; newConnection方法通过得到当前连接的地址及端口号来获得一个Address,通过createFrameHandler的方法 来得到FrameHandler;再通过new AMQConnection(this, frameHandler)来得到Connection并启动。如代码清单7-1所示。
代码清单7-1 创建Connection的源码(ConnectionFactory.java中)
- 1. protected FrameHandler createFrameHandler(Address addr)
- 2. throws IOException {
- 3.
- 4. String hostName = addr.getHost();
- 5. int portNumber = portOrDefault(addr.getPort());//得到端口号
- 6. Socket socket = factory.createSocket();
- 7. configureSocket(socket);//这里Socket通过TCP无迟延的协议
- 8. socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);
- 9. return createFrameHandler(socket);
- 10. }
- 11.
- 12. protected FrameHandler createFrameHandler(Socket sock)
- 13. throws IOException
- 14. {
- 15. return new SocketFrameHandler(sock);
- 16. }
- 17.
- 18. /**
- 19. * Provides a hook to insert custom configuration of the sockets
- 20. * used to connect to an AMQP server before they connect.
- 21. *
- 22. * The default behaviour of this method is to disable Nagle's
- 23. * algorithm to get more consistently low latency. However it
- 24. * may be overridden freely and there is no requirement to retain
- 25. * this behaviour.
- 26. *
- 27. * @param socket The socket that is to be used for the Connection
- 28. */
- 29. protected void configureSocket(Socket socket) throws IOException{
- 30. // disable Nagle's algorithm, for more consistently low latency
- 31. socket.setTcpNoDelay(true);
- 32. }
- 33.
- 34. /**
- 35. * Create a new broker connection
- 36. * @param addrs an array of known broker addresses (hostname/port pairs) to try in order
- 37. * @return an interface to the connection
- 38. * @throws IOException if it encounters a problem
- 39. */
- 40. public Connection newConnection(Address[] addrs)
- 41. throws IOException
- 42. {
- 43. IOException lastException = null;
- 44. for (Address addr : addrs) {
- 45. try {
- 46. FrameHandler frameHandler = createFrameHandler(addr);
- 47. AMQConnection conn = new AMQConnection(this,
- 48. frameHandler);
- 49. conn.start();
- 50. return conn;
- 51. } catch (IOException e) {
- 52. lastException = e;
- 53. }
- 54. }
- 55.
- 56. if (lastException == null) {
- 57. throw new IOException("failed to connect");
- 58. } else {
- 59. throw lastException;
- 60. }
- 61. }
- 62.
- 63. /**
- 64. * Create a new broker connection
- 65. * @return an interface to the connection
- 66. * @throws IOException if it encounters a problem
- 67. */
- 68. public Connection newConnection() throws IOException {
- 69. return newConnection(new Address[] {
- 70. new Address(getHost(), getPort())});
- 71. }
代码清单7-2 连接启动
- /**
- * Start up the connection, including the MainLoop thread.
- * Sends the protocol
- * version negotiation header, and runs through
- * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then
- * calls Connection.Open and waits for the OpenOk. Sets heartbeat
- * and frame max values after tuning has taken place.
- * @throws java.io.IOException if an error is encountered; IOException
- * subtypes {@link ProtocolVersionMismatchException} and
- * {@link PossibleAuthenticationFailureException} will be thrown in the
- * corresponding circumstances.
- */
- public void start()
- throws IOException
- {
- // Make sure that the first thing we do is to send the header,
- // which should cause any socket errors to show up for us, rather
- // than risking them pop out in the MainLoop
- AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
- new AMQChannel.SimpleBlockingRpcContinuation();
- // We enqueue an RPC continuation here without sending an RPC
- // request, since the protocol specifies that after sending
- // the version negotiation header, the client (connection
- // initiator) is to wait for a connection.start method to
- // arrive.
- _channel0.enqueueRpc(connStartBlocker);
- // The following two lines are akin to AMQChannel's
- // transmit() method for this pseudo-RPC.
- _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
- _frameHandler.sendHeader();
- // start the main loop going
- Thread ml = new MainLoop();
- ml.setName("AMQP Connection " + getHostAddress() + ":" + getPort());
- ml.start();
- AMQP.Connection.Start connStart = null;
- try {
- connStart =
- (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();
- _serverProperties = connStart.getServerProperties();
- Version serverVersion =
- new Version(connStart.getVersionMajor(),
- connStart.getVersionMinor());
- if (!Version.checkVersion(clientVersion, serverVersion)) {
- _frameHandler.close(); //this will cause mainLoop to terminate
- throw new ProtocolVersionMismatchException(clientVersion,
- serverVersion);
- }
- } catch (ShutdownSignalException sse) {
- throw AMQChannel.wrap(sse);
- }
- String[] mechanisms = connStart.getMechanisms().toString().split(" ");
- SaslMechanism sm = _factory.getSaslConfig().getSaslMechanism(mechanisms);
- if (sm == null) {
- throw new IOException("No compatible authentication mechanism found - " +
- "server offered [" + connStart.getMechanisms() + "]");
- }
- LongString challenge = null;
- LongString response = sm.handleChallenge(null, _factory);
- AMQP.Connection.Tune connTune = null;
- do {
- Method method = (challenge == null)
- ? new AMQImpl.Connection.StartOk(_clientProperties,
- sm.getName(),
- response, "en_US")
- : new AMQImpl.Connection.SecureOk(response);
- try {
- Method serverResponse = _channel0.rpc(method).getMethod();
- if (serverResponse instanceof AMQP.Connection.Tune) {
- connTune = (AMQP.Connection.Tune) serverResponse;
- } else {
- challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
- response = sm.handleChallenge(challenge, _factory);
- }
- } catch (ShutdownSignalException e) {
- throw new PossibleAuthenticationFailureException(e);
- }
- } while (connTune == null);
- int channelMax =
- negotiatedMaxValue(_factory.getRequestedChannelMax(),
- connTune.getChannelMax());
- _channelManager = new ChannelManager(channelMax);
- int frameMax =
- negotiatedMaxValue(_factory.getRequestedFrameMax(),
- connTune.getFrameMax());
- setFrameMax(frameMax);
- int heartbeat =
- negotiatedMaxValue(_factory.getRequestedHeartbeat(),
- connTune.getHeartbeat());
- setHeartbeat(heartbeat);
- _channel0.transmit(new AMQImpl.Connection.TuneOk(channelMax,
- frameMax,
- heartbeat));
- // 0.9.1: insist [on not being redirected] is deprecated, but
- // still in generated code; just pass a dummy value here
- _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,
- "",
- false)).getMethod();
- return;
- }
相关推荐
消息队列:RabbitMQ:RabbitMQ高级特性:Routing键与绑定.docx
RabbitMQ学习笔记:Connections、Channels、Queues之state状态连接、信道、队列状态如下:GitHub地址:https://gi
rabbitmq-3.10.6:management
消息队列:RabbitMQ:RabbitMQ高级特性:Exchange类型.docx
消息队列:RabbitMQ:RabbitMQ高级特性:死信队列技术教程.docx
标题中的“rabbitmq学习10:使用spring-amqp发送消息及异步接收消息”表明了本次讨论的主题,即如何在Spring框架中利用Spring AMQP组件与RabbitMQ进行交互,实现消息的发送和异步接收。RabbitMQ是一个开源的消息代理...
在本主题"rabbitmq学习11:基于rabbitmq和spring-amqp的远程接口调用"中,我们将深入探讨如何使用RabbitMQ作为消息中间件,结合Spring-AMQP库实现RPC模式。 RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP...
RabbitMQ分发方式:主题模式思维脑图
适用于RabbitMQ的Docker 描述 该存储库使使用docker构建RabbitMQ变得简单。 先决条件 快速开始 docker-compose up -d 设置 步骤1:将节点添加到您docker-compose.yml version : ' 3.3 ' services : rabbitmq : ...
rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:RabbitMQ相关的面试题和问题解析 rabbitmq面试:...
The default behaviour for RabbitMQ when a maximum queue length or size is set an
RabbitMQ
- 插件版本需要与RabbitMQ服务器版本兼容,确保稳定运行。 - 延迟插件仅影响消息的投递时间,并不保证消息在特定时间到达,因为这取决于RabbitMQ服务器的调度策略和网络状况。 - 长时间的延迟可能导致内存或磁盘...
RabbitMQ示例3:发布与订阅【fanout转换器】 RabbitMQ示例4:路由【直接交换机】 RabbitMQ示例5:主题【topic切换】 RabbitMQ示例6:远程过程调用RPC Pom.xml <? xml version = " 1.0 " encoding = " UTF-8 ...
**RabbitMQ实战:高效部署分布式消息队列** ...通过本教程的学习,读者将能够熟练掌握RabbitMQ的核心概念、部署策略以及在实际项目中的应用,从而实现高效的消息队列管理,提升系统性能和稳定性。
当我们把RabbitMQ与Kubernetes结合时,可以构建出一个高可用、可伸缩的RabbitMQ集群。本文将深入探讨如何在Kubernetes环境中部署和管理RabbitMQ集群。 首先,"kubernetes-rabbitmq-cluster"项目是专为Kubernetes...
RabbitMQ学习实践二:MQ的安装
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...