`
yimeng528
  • 浏览: 188716 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq学习7:ConntectionFactory与Conntection的认知

阅读更多

从前面几小节的学习,我们可能知道在发送和接收消息重要的类ConnectionFactory, Connection,Channel和QueueingConsumer。

  ConntectionFactory类是方便创建与AMQP代理相关联的Connection;下面来看看ConntectionFactory是如何创建一个Contention.

  首先通过new ConnectionFactory()创建一个ConnectionFactory;并设置此连接工厂的主机设置为localhost。通过ConnectionFactory的newConnection()方法 创建一个Connection; newConnection方法通过得到当前连接的地址及端口号来获得一个Address,通过createFrameHandler的方法 来得到FrameHandler;再通过new AMQConnection(this, frameHandler)来得到Connection并启动。如代码清单7-1所示。

    代码清单7-1 创建Connection的源码(ConnectionFactory.java中)

 

Java代码  收藏代码
  1.  1protected FrameHandler createFrameHandler(Address addr)    
  2.  2.         throws IOException {    
  3.  3.     
  4.  4.         String hostName = addr.getHost();    
  5.  5.         int portNumber = portOrDefault(addr.getPort());//得到端口号    
  6.  6.         Socket socket = factory.createSocket();    
  7.  7.         configureSocket(socket);//这里Socket通过TCP无迟延的协议    
  8.  8.         socket.connect(new InetSocketAddress(hostName, portNumber), connectionTimeout);    
  9.  9.         return createFrameHandler(socket);    
  10. 10.     }    
  11. 11.     
  12. 12.     protected FrameHandler createFrameHandler(Socket sock)    
  13. 13.         throws IOException    
  14. 14.     {    
  15. 15.         return new SocketFrameHandler(sock);    
  16. 16.     }    
  17. 17.     
  18. 18.     /**  
  19. 19.      *  Provides a hook to insert custom configuration of the sockets  
  20. 20.      *  used to connect to an AMQP server before they connect.  
  21. 21.      *  
  22. 22.      *  The default behaviour of this method is to disable Nagle's  
  23. 23.      *  algorithm to get more consistently low latency.  However it  
  24. 24.      *  may be overridden freely and there is no requirement to retain  
  25. 25.      *  this behaviour.  
  26. 26.      *  
  27. 27.      *  @param socket The socket that is to be used for the Connection  
  28. 28.      */    
  29. 29.     protected void configureSocket(Socket socket) throws IOException{    
  30. 30.         // disable Nagle's algorithm, for more consistently low latency    
  31. 31.         socket.setTcpNoDelay(true);    
  32. 32.     }    
  33. 33.     
  34. 34.     /**  
  35. 35.      * Create a new broker connection  
  36. 36.      * @param addrs an array of known broker addresses (hostname/port pairs) to try in order  
  37. 37.      * @return an interface to the connection  
  38. 38.      * @throws IOException if it encounters a problem  
  39. 39.      */    
  40. 40.     public Connection newConnection(Address[] addrs)    
  41. 41.         throws IOException    
  42. 42.     {    
  43. 43.         IOException lastException = null;    
  44. 44.         for (Address addr : addrs) {    
  45. 45.             try {    
  46. 46.                 FrameHandler frameHandler = createFrameHandler(addr);    
  47. 47.                 AMQConnection conn = new AMQConnection(this,    
  48. 48.                                                        frameHandler);    
  49. 49.                 conn.start();    
  50. 50.                 return conn;    
  51. 51.             } catch (IOException e) {    
  52. 52.                 lastException = e;    
  53. 53.             }    
  54. 54.         }    
  55. 55.     
  56. 56.         if (lastException == null) {    
  57. 57.             throw new IOException("failed to connect");    
  58. 58.         } else {    
  59. 59.             throw lastException;    
  60. 60.         }    
  61. 61.     }    
  62. 62.     
  63. 63.     /**  
  64. 64.      * Create a new broker connection  
  65. 65.      * @return an interface to the connection  
  66. 66.      * @throws IOException if it encounters a problem  
  67. 67.      */    
  68. 68.     public Connection newConnection() throws IOException {    
  69. 69.         return newConnection(new Address[] {    
  70. 70.                                  new Address(getHost(), getPort())});    
  71. 71.     }    

 

代码清单7-2 连接启动

 

Java代码  收藏代码
  1. /** 
  2.      * Start up the connection, including the MainLoop thread. 
  3.      * Sends the protocol 
  4.      * version negotiation header, and runs through 
  5.      * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then 
  6.      * calls Connection.Open and waits for the OpenOk. Sets heartbeat 
  7.      * and frame max values after tuning has taken place. 
  8.      * @throws java.io.IOException if an error is encountered; IOException 
  9.      * subtypes {@link ProtocolVersionMismatchException} and 
  10.      * {@link PossibleAuthenticationFailureException} will be thrown in the 
  11.      * corresponding circumstances. 
  12.      */  
  13.     public void start()  
  14.         throws IOException  
  15.     {  
  16.         // Make sure that the first thing we do is to send the header,  
  17.         // which should cause any socket errors to show up for us, rather  
  18.         // than risking them pop out in the MainLoop  
  19.         AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =  
  20.             new AMQChannel.SimpleBlockingRpcContinuation();  
  21.         // We enqueue an RPC continuation here without sending an RPC  
  22.         // request, since the protocol specifies that after sending  
  23.         // the version negotiation header, the client (connection  
  24.         // initiator) is to wait for a connection.start method to  
  25.         // arrive.  
  26.         _channel0.enqueueRpc(connStartBlocker);  
  27.         // The following two lines are akin to AMQChannel's  
  28.         // transmit() method for this pseudo-RPC.  
  29.         _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);  
  30.         _frameHandler.sendHeader();  
  31.   
  32.         // start the main loop going  
  33.         Thread ml = new MainLoop();  
  34.         ml.setName("AMQP Connection " + getHostAddress() + ":" + getPort());  
  35.         ml.start();  
  36.   
  37.         AMQP.Connection.Start connStart = null;  
  38.         try {  
  39.             connStart =  
  40.                 (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();  
  41.   
  42.             _serverProperties = connStart.getServerProperties();  
  43.   
  44.             Version serverVersion =  
  45.                 new Version(connStart.getVersionMajor(),  
  46.                             connStart.getVersionMinor());  
  47.   
  48.             if (!Version.checkVersion(clientVersion, serverVersion)) {  
  49.                 _frameHandler.close(); //this will cause mainLoop to terminate  
  50.                 throw new ProtocolVersionMismatchException(clientVersion,  
  51.                                                            serverVersion);  
  52.             }  
  53.         } catch (ShutdownSignalException sse) {  
  54.             throw AMQChannel.wrap(sse);  
  55.         }  
  56.   
  57.         String[] mechanisms = connStart.getMechanisms().toString().split(" ");  
  58.         SaslMechanism sm = _factory.getSaslConfig().getSaslMechanism(mechanisms);  
  59.         if (sm == null) {  
  60.             throw new IOException("No compatible authentication mechanism found - " +  
  61.                     "server offered [" + connStart.getMechanisms() + "]");  
  62.         }  
  63.   
  64.         LongString challenge = null;  
  65.         LongString response = sm.handleChallenge(null, _factory);  
  66.   
  67.         AMQP.Connection.Tune connTune = null;  
  68.         do {  
  69.             Method method = (challenge == null)  
  70.                 ? new AMQImpl.Connection.StartOk(_clientProperties,  
  71.                                                  sm.getName(),  
  72.                                                  response, "en_US")  
  73.                 : new AMQImpl.Connection.SecureOk(response);  
  74.   
  75.             try {  
  76.                 Method serverResponse = _channel0.rpc(method).getMethod();  
  77.                 if (serverResponse instanceof AMQP.Connection.Tune) {  
  78.                     connTune = (AMQP.Connection.Tune) serverResponse;  
  79.                 } else {  
  80.                     challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();  
  81.                     response = sm.handleChallenge(challenge, _factory);  
  82.                 }  
  83.             } catch (ShutdownSignalException e) {  
  84.                 throw new PossibleAuthenticationFailureException(e);  
  85.             }  
  86.         } while (connTune == null);  
  87.   
  88.         int channelMax =  
  89.             negotiatedMaxValue(_factory.getRequestedChannelMax(),  
  90.                                connTune.getChannelMax());  
  91.         _channelManager = new ChannelManager(channelMax);  
  92.   
  93.         int frameMax =  
  94.             negotiatedMaxValue(_factory.getRequestedFrameMax(),  
  95.                                connTune.getFrameMax());  
  96.         setFrameMax(frameMax);  
  97.   
  98.         int heartbeat =  
  99.             negotiatedMaxValue(_factory.getRequestedHeartbeat(),  
  100.                                connTune.getHeartbeat());  
  101.         setHeartbeat(heartbeat);  
  102.   
  103.         _channel0.transmit(new AMQImpl.Connection.TuneOk(channelMax,  
  104.                                                          frameMax,  
  105.                                                          heartbeat));  
  106.         // 0.9.1: insist [on not being redirected] is deprecated, but  
  107.         // still in generated code; just pass a dummy value here  
  108.         _channel0.exnWrappingRpc(new AMQImpl.Connection.Open(_virtualHost,  
  109.                                                             "",  
  110.                                                             false)).getMethod();  
  111.         return;  
  112.     }  

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics