一、ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
开启一个连接,默认localhost和默认端口
factory.newConnection(addressAry);
二、newConnection
完整的方法签名:
public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException, TimeoutException
作用:
创建一个broker连接。这里的地址连接信息是个数组,在处理时会使用第一个连上的地址。
创建连接时,可以选择automatic connection recovery。如果这个属性设为true,那么当连接断开时,框架会自动连上,此时选择的地址就是随机的。
执行步骤:
1、创建FrameHandlerFactory,之后会通过socket发送header消息。
2、创建ConnectionParams,封装一些参数为实体。
3、是否设置isAutomaticRecoveryEnabled
4、如果设置了,则返回AutorecoveringConnection
5、如果没有设置,则返回AMQConnection
三、返回AMQConnection步骤
1、FrameHandler
FrameHandler handler = fhFactory.create(addr);
1、通过SocketFactory初始化一个socket连接,并封装此socket的input和output流到SocketFrameHandler(extends FrameHandler)。
2、设置此socket不延时属性。这是一个算法,算法名称:Nagle's algorithm。
// disable Nagle's algorithm, for more consistently low latency
socket.setTcpNoDelay(true);
2、AMQConnetion
AMQConnection conn = new AMQConnection(params, handler);
conn.start();
1、initializeConsumerWorkService
this._workService = new ConsumerWorkService(executor, threadFactory, shutdownTimeout);
参数定义:
public static final int HANDSHAKE_TIMEOUT = 10000;
private final ExecutorService executor;
private Thread mainLoopThread;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
构造方法:
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int shutdownTimeout) {
this.privateExecutor = (executor == null); //是否是自定义的线程池,如果executor为空,则框架会默认初始化
// DEFAULT_NUM_THREADS默认为操作系统处理器个数的2倍
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory) : executor;
this.workPool = new WorkPool<Channel, Runnable>();
this.shutdownTimeout = shutdownTimeout;
}
2、initializeHeartbeatSender
3、this._running = true; //处于运行中状态
4、_frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
_frameHandler.sendHeader();
通过socket发送0-9-1 AMQP版本header信息
5、启动mainloop MainLoop loop = new MainLoop();
final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
mainLoopThread = Environment.newThread(threadFactory, loop, name);
mainLoopThread.start();
作用:
只要_running=true,则一直通过frameHandler循环读取frame。
相关推荐
- **rabbitmq_connection_init**: 初始化连接结构体,准备建立到RabbitMQ服务器的连接。 - **rabbitmq_channel_open**: 打开一个新的通道,通道是RabbitMQ中进行操作的基本单位。 - **rabbitmq_exchange_declare*...
完成操作后,记得调用`rabbitmq_connection_close()`和`rabbitmq_free_connection()`关闭并释放资源。 2. 发布与消费:`rabbitmq_basic_publish()`用于发布消息,而`rabbitmq_basic_consume()`用于设置消费。注意,...
RabbitMQ-C库提供了诸如`rabbitmq_connection_init`、`rabbitmq_channel_open`等函数来实现这些操作。此外,发布消息和接收消息也有对应的API,例如`rabbitmq_basic_publish`和`rabbitmq_basic_consume`。 在VS2019...
1. **连接(Connection)**:客户端与RabbitMQ服务器之间的网络连接。 2. **通道(Channel)**:在连接上建立的逻辑会话,用于执行具体任务,如发布消息或声明队列。 3. **队列(Queue)**:消息的容器,消费者从中...
1. **连接管理**:`rmq_connection_init()`初始化连接,`rmq_connection_destroy()`关闭连接。 2. **通道操作**:`rmq_channel_open()`打开通道,`rmq_channel_close()`关闭通道。 3. **消息发送**:`rmq_basic_...
- **Connection**:表示到RabbitMQ服务器的网络连接,可以通过它创建Channel。 - **Channel**:实际执行操作(如声明交换机、队列、绑定,发送和接收消息)的通道,是线程安全的。 - **BasicPublish**:用于生产者...
关键函数包括`amqp_new_connection`(创建连接)、`amqp_socket_open`(打开连接)、`amqp_channel_open`(打开通道)、`amqp_queue_declare`(声明队列)等。 3. **C++接口库**:`rabbitmq-c++`通常是对`rabbitmq-...
1. **连接与认证**:在使用`rabbitmq-client-1.3.0.jar`时,首先要创建一个`ConnectionFactory`对象,配置服务器地址、端口、用户名和密码等信息,然后通过`createConnection()`方法建立与RabbitMQ服务器的连接。...
1. **连接(Connection)**:客户端与RabbitMQ服务器之间的网络连接。开发者需要创建一个Connection对象,然后通过这个连接发送和接收消息。 2. **通道(Channel)**:在RabbitMQ中,所有的操作都是通过通道进行的...
- `Connection`:表示与RabbitMQ服务器的连接。 - `Channel`:实际进行消息操作的通道对象。 - `QueueingConsumer`:处理接收到的消息的消费者对象。 - `BasicProperties`:用来设置消息的属性,如内容类型、优先级...
- `rabbitmq_connection_open`:建立到RabbitMQ服务器的连接。 - `rabbitmq_channel_open`:在连接上打开一个新的通道。 - `rabbitmq_exchange_declare`:声明交换机,设置其类型(如direct、topic等)。 - `...
`ConnectionFactory`类是创建连接的主要入口点,开发者可以通过设置其属性如`Host`、`Port`、`UserName`、`Password`来指定连接参数,然后使用`CreateConnection()`方法建立连接。 2. 通道操作:在RabbitMQ中,通道...
1. **连接(Connection)**:在Java客户端中,首先需要建立一个到RabbitMQ服务器的连接。这通常通过`ConnectionFactory`类来实现,它创建了一个`Connection`对象,使得客户端可以与服务器通信。 2. **通道(Channel...
1. **连接RabbitMQ服务器**:使用`ConnectionFactory`创建连接,并使用`CreateConnection()`方法建立到RabbitMQ服务器的连接。 2. **创建通道**:通过`IConnection`对象的`CreateModel()`方法创建一个新的通道,...
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
1. **创建连接和通道(Connection and Channel)**:RabbitMQ 的通信通过 TCP 连接进行,应用会创建一个连接对象,然后在该连接上创建一个或多个通道。通道是执行 AMQP 命令的逻辑信道。 2. **声明交换器...
### RabbitMQ:安装、配置与使用初探 #### 一、下载及安装 RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)协议的消息中间件。本文将介绍如何在 CentOS 上安装并配置 RabbitMQ。 ##### 1.1 安装 ...
- 连接:通过`ConnectionFactory`创建连接,然后使用`Connection.createChannel()`方法创建通道。 - 定义交换机:使用`channel.exchangeDeclare()`方法声明交换机。 - 创建队列:使用`channel.queueDeclare()`...
RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步任务处理、解耦组件以及负载均衡。在这个“rabbitmq-server-windows-3.8.9”压缩包中,...