`

rabbitmq-connection

 
阅读更多

一、ConnectionFactory

ConnectionFactory factory = new ConnectionFactory();

 

开启一个连接,默认localhost和默认端口 

factory.newConnection(addressAry);

 

二、newConnection

 

完整的方法签名:

public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException

 

作用:

创建一个broker连接。这里的地址连接信息是个数组,在处理时会使用第一个连上的地址。

创建连接时,可以选择automatic connection recovery。如果这个属性设为true,那么当连接断开时,框架会自动连上,此时选择的地址就是随机的。

 

执行步骤:

1、创建FrameHandlerFactory,之后会通过socket发送header消息。

2、创建ConnectionParams,封装一些参数为实体。

3、是否设置isAutomaticRecoveryEnabled

4、如果设置了,则返回AutorecoveringConnection

5、如果没有设置,则返回AMQConnection

 

三、返回AMQConnection步骤

1、FrameHandler

 FrameHandler handler = fhFactory.create(addr);

        1、通过SocketFactory初始化一个socket连接,并封装此socket的input和output流到SocketFrameHandler(extends FrameHandler)。

        2、设置此socket不延时属性。这是一个算法,算法名称:Nagle's algorithm。

             // disable Nagle's algorithm, for more consistently low latency
             socket.setTcpNoDelay(true);

 

2、AMQConnetion

AMQConnection conn = new AMQConnection(params, handler);

conn.start();

      1、initializeConsumerWorkService

           this._workService  = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);

           参数定义:

           public static final int HANDSHAKE_TIMEOUT = 10000;
           private final ExecutorService executor;
           private Thread mainLoopThread;
           private ThreadFactory threadFactory = Executors.defaultThreadFactory();

           构造方法:

           public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
                this.privateExecutor = (executor == null);     //是否是自定义的线程池,如果executor为空,则框架会默认初始化

               // DEFAULT_NUM_THREADS默认为操作系统处理器个数的2倍
                this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory) : executor;
               this.workPool = new WorkPool<Channel, Runnable>();
               this.shutdownTimeout = shutdownTimeout;
          }

 

     2、initializeHeartbeatSender

         

     3、this._running = true; //处于运行中状态

 

     4、_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
          _frameHandler.sendHeader();

           通过socket发送0-9-1 AMQP版本header信息

 

    5、启动mainloop MainLoop loop = new MainLoop();
        final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
        mainLoopThread = Environment.newThread(threadFactory, loop, name);
        mainLoopThread.start();

        作用:

        只要_running=true,则一直通过frameHandler循环读取frame。

 

 

 

 

 

 

分享到:
评论

相关推荐

    RabbitMQ-c源码

    - **rabbitmq_connection_init**: 初始化连接结构体,准备建立到RabbitMQ服务器的连接。 - **rabbitmq_channel_open**: 打开一个新的通道,通道是RabbitMQ中进行操作的基本单位。 - **rabbitmq_exchange_declare*...

    rabbitmq-c vs2019工程

    RabbitMQ-C库提供了诸如`rabbitmq_connection_init`、`rabbitmq_channel_open`等函数来实现这些操作。此外,发布消息和接收消息也有对应的API,例如`rabbitmq_basic_publish`和`rabbitmq_basic_consume`。 在VS2019...

    rabbitmq-c-master.zip

    完成操作后,记得调用`rabbitmq_connection_close()`和`rabbitmq_free_connection()`关闭并释放资源。 2. 发布与消费:`rabbitmq_basic_publish()`用于发布消息,而`rabbitmq_basic_consume()`用于设置消费。注意,...

    rabbitmq-c & rabbitmq-codegen.tar.gz

    1. **连接(Connection)**:客户端与RabbitMQ服务器之间的网络连接。 2. **通道(Channel)**:在连接上建立的逻辑会话,用于执行具体任务,如发布消息或声明队列。 3. **队列(Queue)**:消息的容器,消费者从中...

    rabbitmq-c.rar

    1. **连接管理**:`rmq_connection_init()`初始化连接,`rmq_connection_destroy()`关闭连接。 2. **通道操作**:`rmq_channel_open()`打开通道,`rmq_channel_close()`关闭通道。 3. **消息发送**:`rmq_basic_...

    rabbitmq-java-client-bin-3.3.4.zip

    - **Connection**:表示到RabbitMQ服务器的网络连接,可以通过它创建Channel。 - **Channel**:实际执行操作(如声明交换机、队列、绑定,发送和接收消息)的通道,是线程安全的。 - **BasicPublish**:用于生产者...

    rabbitmq-c rabbitmq amqp c++接口库

    关键函数包括`amqp_new_connection`(创建连接)、`amqp_socket_open`(打开连接)、`amqp_channel_open`(打开通道)、`amqp_queue_declare`(声明队列)等。 3. **C++接口库**:`rabbitmq-c++`通常是对`rabbitmq-...

    rabbitmq-client-1.3.0.jar

    1. **连接与认证**:在使用`rabbitmq-client-1.3.0.jar`时,首先要创建一个`ConnectionFactory`对象,配置服务器地址、端口、用户名和密码等信息,然后通过`createConnection()`方法建立与RabbitMQ服务器的连接。...

    rabbitmq-java-client-bin-3.3.4

    1. **连接(Connection)**:客户端与RabbitMQ服务器之间的网络连接。开发者需要创建一个Connection对象,然后通过这个连接发送和接收消息。 2. **通道(Channel)**:在RabbitMQ中,所有的操作都是通过通道进行的...

    rabbitmq-java-client-3.4.1.zip

    - `Connection`:表示与RabbitMQ服务器的连接。 - `Channel`:实际进行消息操作的通道对象。 - `QueueingConsumer`:处理接收到的消息的消费者对象。 - `BasicProperties`:用来设置消息的属性,如内容类型、优先级...

    rabbitmq-dotnet-client-3.5.0

    `ConnectionFactory`类是创建连接的主要入口点,开发者可以通过设置其属性如`Host`、`Port`、`UserName`、`Password`来指定连接参数,然后使用`CreateConnection()`方法建立连接。 2. 通道操作:在RabbitMQ中,通道...

    rabbitmq-c.tar.gz

    - `rabbitmq_connection_open`:建立到RabbitMQ服务器的连接。 - `rabbitmq_channel_open`:在连接上打开一个新的通道。 - `rabbitmq_exchange_declare`:声明交换机,设置其类型(如direct、topic等)。 - `...

    rabbitmq-java-client-bin-2.7.0.zip

    1. **连接(Connection)**:在Java客户端中,首先需要建立一个到RabbitMQ服务器的连接。这通常通过`ConnectionFactory`类来实现,它创建了一个`Connection`对象,使得客户端可以与服务器通信。 2. **通道(Channel...

    rabbitmq-dotnet-client-3.6.4-dotnet-4.6.1.rar

    1. **连接RabbitMQ服务器**:使用`ConnectionFactory`创建连接,并使用`CreateConnection()`方法建立到RabbitMQ服务器的连接。 2. **创建通道**:通过`IConnection`对象的`CreateModel()`方法创建一个新的通道,...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    rabbitMQ-demo.zip_DEMO_piguhw_rabbitMQ-demo_rabbitmq .idx

    1. **创建连接和通道(Connection and Channel)**:RabbitMQ 的通信通过 TCP 连接进行,应用会创建一个连接对象,然后在该连接上创建一个或多个通道。通道是执行 AMQP 命令的逻辑信道。 2. **声明交换器...

    RabbitMQ:安装、配置与使用初探

    ### RabbitMQ:安装、配置与使用初探 #### 一、下载及安装 RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)协议的消息中间件。本文将介绍如何在 CentOS 上安装并配置 RabbitMQ。 ##### 1.1 安装 ...

    rabbitmq-jar包

    - 连接:通过`ConnectionFactory`创建连接,然后使用`Connection.createChannel()`方法创建通道。 - 定义交换机:使用`channel.exchangeDeclare()`方法声明交换机。 - 创建队列:使用`channel.queueDeclare()`...

    rabbitmq-server-windows-3.8.9,内含延迟插件(rabbitmq_delayed_message_exchange-3.8.0.ez)

    RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步任务处理、解耦组件以及负载均衡。在这个“rabbitmq-server-windows-3.8.9”压缩包中,...

Global site tag (gtag.js) - Google Analytics