一、RabbitMQ特性
持久化:消息持久化到磁盘,需要满足三个条件:
- 投递模式选项设置为2(持久)
- 发送到持久化的交换器
- 到达持久化的队列
事物:RabbitMQ支持事物,但是事物是同步的,性能太差。RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
try {
channel.txSelect();
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
确认机制(Comfirm):分消费端comfirm和生产端comfirm。事物和确认机制都是为了保证消息不丢失。确认机制是异步的,比事物性能高。
- basic.ack:确认收到消息
- basic.nack:拒绝消息,支持requeue,支持批量nack(reject的加强版)
- basic.reject:拒绝消息,支持requeue,不支持批量
broker是轮流给Consumer发送消息的,如果requeue的消息broker就会轮流发到下一个Consumer。如果没有Ack也没有NoAck,broker会挂起这个消息(至到消费该消息的Consumer连接断掉才会requeue该消息),没有NoAck的Consumer还是会收到消息。
二、Consumer Comfirm
订阅消息时将autoAck设置为true的话,那么你处理完消息之后就无须再发送确认消息回服务器。这样就能极大的加快消费者消费的速度。否则需要代码手动ack。
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
this.getChannel().basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, false, consumer); //第二个参数autoAck为false
三、Producer Comfirm
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。
已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.confirmSelect(); //Producer端开启comfirm模式
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
分享到:
相关推荐
"RabbitMQ Mirror机制分析" RabbitMQ Mirror机制是RabbitMQ中的一种高可用性机制,旨在提供消息队列的高可用性和持久化。Mirror机制的核心是镜像队列(Mirror Queue),它是一个特殊的Backing Queue,内部包裹了一...
SpringBoot + RabbitMQ 实现消息确认机制的踩坑经验 SpringBoot 和 RabbitMQ 是当前流行的微服务架构中常用的技术栈,然而在实际开发中,消息确认机制的实现却是一个坑爹的点。今天,我将与大家分享小编在实际开发...
RabbitMQ 的 Confirm 机制是生产者与 RabbitMQ 之间的确认机制,确保消息确实投递到了 MQ。在许多对可靠性要求比较高的应用场景下都需要使用该机制确保消息不丢。Confirm 机制的性能对应用的影响很大,测试显示,在...
SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材 可以参考博客: https://blog.csdn.net/qq_29914837/article/details/93376741
这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。 RabbitMQ支持多种消息协议的消息传递,包括AMQP协议、STOMP协议、MQTT协议等。RabbitMQ还提供了多种交换机类型,包括direct exchange、fanout ...
理解Confirm消息确认机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心...
RabbitMQ支持消息确认机制,允许服务器确认消息是否已被消费者成功处理,从而保证消息的可靠性。 8. **持久化** 通过配置,可以将消息和队列设置为持久化,即使服务器重启,数据也不会丢失。 9. **集群和高可用...
- "针对rabbimq_镜像和确认机制的小测试.docx" 和 "RabbitMQ_的Confirm机制优化.docx":这两份文档深入讲解了RabbitMQ的确认机制,包括消息的发送确认和消费确认,以及如何利用这一机制实现数据的可靠传输,特别是...
手动确认模式(Confirm Mode)是RabbitMQ提供的一种机制,用于增强消息的可靠传输。在该模式下,消费者需显式地发送一个确认信号给RabbitMQ,表示消息已被成功处理,此时RabbitMQ才会真正删除消息。如果消费者没有...
8. **异常处理与确认**:为了保证消息的可靠性,可以启用消费者确认机制。当消息被正确处理后,消费者需要发送一个确认信号给RabbitMQ,否则消息将被重新投递。同时,应妥善处理异常,避免因为错误导致消息丢失。 9...
通过测试,我们可以确定RabbitMQ在不同场景下的吞吐量、延迟和资源消耗情况,从而为其在实际生产环境中的部署提供参考。 2 **测试环境** 2.1 **测试规格** 测试是在RabbitMQ 3.6.2版本上进行的,采用了两种不同的...
源码中会揭示它们如何建立连接、发送和接收消息,以及如何处理消费确认机制。 8. **客户端API** 提供的`rabbitmq-java-client-2.5.1`是RabbitMQ的Java客户端库,用于与RabbitMQ服务器交互。它提供了创建连接、通道...
* 消息确认:RabbitMQ 提供了消息确认机制,确保消息的可靠传递。 * 消息持久化:RabbitMQ 提供了消息持久化机制,确保消息的安全。 demo 下面是一些使用 RabbitMQ 的 demo 例子: demo(1):简单的消息队列 * ...
RabbitMQ是一个基于Erlang语言开发的消息中间件,它遵循AMQP(Advanced Message ...在实际开发中,你可以根据项目需求调整这些示例,例如增加错误处理、消息确认机制、使用工作队列模型等,以实现更复杂的业务逻辑。
2. **消息确认**:消费者可以通过确认机制通知RabbitMQ是否成功处理了消息,确保消息不丢失。 3. **死信队列**:当消息无法路由或消费者处理失败时,可以设置死信队列,将这些消息隔离处理。 4. **延迟队列**:...
RabbitMQ支持消息确认机制,消费者在处理完消息后需要发送一个确认信号。Spring Boot中可以通过配置`simple.concurrent-consumers`和`simple.auto-ack`来控制并发消费者数量以及是否自动确认。 综上所述,"java ...
7. **消息确认**:RabbitMQ支持消息确认机制,确保消息已被正确处理。`amqp_set_delivery_tag`和`amqp.basic.ack`方法用于此目的。C++库可能会提供自动确认或手动确认的选项。 8. **错误处理**:在C++接口库中,...
- 消息确认:生产者和消费者都可以开启消息确认机制,确保消息的可靠传输。 - 消息持久化:通过设置,可以使消息在服务器重启后仍能恢复,保证高可用性。 - DLX(Dead Letter Exchange):死信队列,当消息无法...