`
uule
  • 浏览: 6358457 次
  • 性别: Icon_minigender_1
  • 来自: 一片神奇的土地
社区版块
存档分类
最新评论

RabbitMQ消息确认机制

 
阅读更多

应用RabbitMQ可靠性传输机制实现Redis缓存的实时更新

 

 

消息中间件集群崩溃,如何保证百万生产数据不丢失?

RabbitMQ暂时放在了自己的内存中,还没来得及投递给下游的仓储服务呢,此时RabbitMQ突然宕机了,会怎么样?

答案其实很简单,默认情况下,按照我们目前的代码和配置,这个数据就会丢失了。

 

持久化

队列持久化:

//queuechannel.queue_declare(queue='hello2',durable=True)

channel.queueDeclare(
 "warehouse_schedule_delivery",
 true, 
 false,
 false,
 null);

 

核心在于第二个参数,第二个参数是true。意思就是说,这个创建的queue是durable的,也就是支持持久化的。

这样,RabbitMQ会把这queue的相关信息持久化的存储到磁盘上去,即使RabbitMQ宕机后重启,也会恢复之前创建好的这个queue。

 

消息持久化:

 

现在你的queue的信息可以持久化了,RabbitMQ宕机重启后会自动恢复queue。但是,你的queue里的message数据呢?

queue里都是订单服务发送过去的订单消息数据,如果RabbitMQ还没来得及投递queue里的订单消息到仓储服务,结果RabbitMQ就宕机了。

 

那此时RabbitMQ重启之后,他可以恢复queue的信息,但是queue的message数据是没法恢复了。

 

所以就需要在你的订单服务发送消息到RabbitMQ的时候,【定义这条消息也是durable】,即持久化的。

#channel.basic_publish(exchange='',routing_key='hello2',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2))


/* 参数:

 * 向server发布一条消息
 * 参数1:exchange名字,若为空则使用默认的exchange
 * 参数2:routing key
 * 参数3:其他的属性
 * 参数4:消息体

 * 【RabbitMQ默认有一个exchange,叫default exchange,它用一个空字符串表示,它是direct exchange类型,

 * 任何发往这个exchange的消息都会被路由到routing key的名字对应的队列上,如果没有对应的队列,则消息会被丢弃】
 */ 

   

channel.basicPublish("", "warehouse_schedule_delivery",MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

 通过上面的方式来发送消息,就可以让发送出去的消息是持久化的。

 

一旦标记了消息是持久化之后,就会让RabbitMQ把消息持久化写入到磁盘上去,此时如果RabbitMQ还没投递数据到仓储服务,结果就突然宕机了。那么再次重启的时候,就会把磁盘上持久化的消息给加载出来。

 

但是这里要注意一点,RabbitMQ的消息持久化,是不承诺100%的消息不丢失的。

 

因为有可能【RabbitMQ接收到了消息,但是还没来得及持久化到磁盘,他自己就宕机了】,【这个时候消息还是会丢失的】。

 

如果要完全100%保证写入RabbitMQ的数据必须落地磁盘,不会丢失,需要依靠其他的机制。

 

 

 

RabbitMQ消息确认机制

 

Message acknowledgment

消费者消息确认机制

 

为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消费者消息确认机制(message acknowledgement)。采用消息确认机制之后,消费者就有足够的时间来处理消息,不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直等待并持有消息,直到消费者确认了该消息。

 

当有Consumer需要大量的运算时,RabbitMQ Server需要一定的分发机制来balance每个Consumer的load。

 

【默认情况下,RabbitMQ会顺序的分发每个Message。当每个【收到ack后,会将该Message删除】,然后将下一个Message分发到下一个Consumer】这种分发方式叫做round-robin。这种分发还有问题。

每个Consumer可能需要一段时间才能处理完收到的数据。如果在这个过程中,Consumer出错了,异常退出了,而数据还没有处理完成,那么非常不幸,这段数据就丢失了。因为我们采用no-ack的方式进行确认,也就是说,【每次Consumer接到数据后,而不管是否处理完成,RabbitMQ Server会立即把这个Message标记为完成,然后从queue中删除了】。

 

如果一个Consumer异常退出了,它处理的数据能够被另外的Consumer处理,这样数据在这种情况下就不会丢失了。

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。【为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack】。而应该是在【处理完数据后发送ack】。(在处理数据后发送的ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以去安全的删除它了,【如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer】。这样就保证了在Consumer异常退出的情况下数据也不会丢失)

 

在实际应用中,可能会发生消费者收到Queue中的消息,但没有处理完成就宕机(或出现其他意外)的情况,这种情况下就可能会导致消息丢失。为了避免这种情况发生,我们可以要求消费者在消费完消息后发送一个回执给RabbitMQ,RabbitMQ收到消息回执(Message acknowledgment)后才将该消息从Queue中移除;【如果RabbitMQ没有收到回执并检测到消费者的RabbitMQ连接断开,则RabbitMQ会将该消息发送给其他消费者(如果存在多个消费者)进行处理】。【这里不存在timeout概念】,【一个消费者处理消息时间再长也不会导致该消息被发送给其他消费者,除非它的RabbitMQ连接断开】。

这里会产生另外一个问题,如果我们的开发人员在处理完业务逻辑后,忘记发送回执给RabbitMQ,这将会导致严重的bug——Queue中堆积的消息会越来越多;消费者重启后会重复消费这些消息并重复执行业务逻辑。

 

RPC

MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 

但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。

 

【RabbitMQ开启手动ack机制保证消费端数据不丢失的时候,prefetch机制对消费者的吞吐量以及内存消耗的影响。

通过分析,我们知道了prefetch过大容易导致内存溢出,prefetch过小又会导致消费吞吐量过低,所以在实际项目中需要慎重测试和设置。

------------------------------------------------------------------------------------------------------

 

生产者消息确认机制

 

当消息发送出去之后,我们如何知道消息有没有正确到达exchange呢?如果在这个过程中,消息丢失了,我们根本不知道发生了什么,也不知道是什么原因导致消息发送失败了

 

为解决这个问题,主要有如下两种方案:

 

【通过事务】机制实现

【通过生产者消息确认机制】(publisher confirm)实现

 

但是使用【事务机制实现会严重降低RabbitMQ的消息吞吐量】,我们采用一种轻量级的方案——生产者消息确认机制

  

什么是生产者消息确认机制?

 

简而言之,就是:生产者发送的消息一旦被投递到所有匹配的队列之后,就会发送一个确认消息给生产者,这就使得生产者知晓消息已经正确到达了目的地。

 

如果消息和队列是持久化存储的,那么确认消息会在消息写入磁盘之后发出。

 

再补充一个Mandatory参数:【当Mandatory参数设为true时,如果目的不可达,会发送消息给生产者,生产者通过一个回调函数来获取该信息。】

 

 

分享到:
评论

相关推荐

    springboot + rabbitmq 如何实现消息确认机制(踩坑经验)

    SpringBoot + RabbitMQ 实现消息确认机制的踩坑经验 SpringBoot 和 RabbitMQ 是当前流行的微服务架构中常用的技术栈,然而在实际开发中,消息确认机制的实现却是一个坑爹的点。今天,我将与大家分享小编在实际开发...

    rabbitmq 实现消息插队

    - 为了保证消息不丢失,RabbitMQ提供了消息确认机制。消费者在接收到消息后需要发送一个确认信号,只有在收到确认后,RabbitMQ才会认为消息已被成功处理,并从队列中删除。 - 如果消费者在处理消息过程中发生异常...

    RabbitMQ消息模式之Confirm确认消息

    理解Confirm消息确认机制 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心...

    RabbitMQ消息中间件技术精讲

    1. **可靠性**:通过持久化消息、确认机制等方式确保消息不会丢失。 2. **灵活性**:支持多种消息传递模式,可以根据实际需求灵活选择。 3. **高性能**:采用高效的并发模型,能够处理大量的消息吞吐量。 4. **可...

    RabbitMQ_Mirror机制分析

    RabbitMQ Mirror机制是RabbitMQ中的一种高可用性机制,旨在提供消息队列的高可用性和持久化。Mirror机制的核心是镜像队列(Mirror Queue),它是一个特殊的Backing Queue,内部包裹了一个普通的Backing Queue,用于...

    RabbitMQ消息确认(ACK)机制实战

    对RabbitMQ消息确认(ACK)原理的理解及实践

    RabbitMQ消息中间件技术精讲.txt

    4. **确认机制**:为了确保消息被成功处理,消费者在处理完消息后会向RabbitMQ发送确认信号,只有当所有消息都被确认后,这些消息才会从队列中移除。 #### 四、RabbitMQ的特点 - **可靠性**:RabbitMQ提供了多种...

    RabbitMQ消息中间件视频教程

    3. **消息确认机制**:为了确保消息不丢失,RabbitMQ提供了消息确认机制,包括手动确认(Manual Acknowledgement)和自动确认(Automatic Acknowledgement)两种方式。 4. **集群部署**:通过集群部署可以实现...

    springboot整合rabbitmq,开启手工确认。保证消息100%投递

    下面将详细解释如何在Spring Boot中配置RabbitMQ,并启用手动确认机制来增强消息的可靠性。 首先,我们需要在Spring Boot项目中引入RabbitMQ的相关依赖。这通常通过在`pom.xml`或`build.gradle`文件中添加Spring ...

    SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材

    SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材 可以参考博客: https://blog.csdn.net/qq_29914837/article/details/93376741

    Springboot+RabbitMQ 消息中间件

    本文将深入探讨SpringBoot与RabbitMQ的集成,以及如何实现消息的发送、接收、确认(ack)机制以及基于Redis的消息补偿机制。 首先,我们需要理解SpringBoot的核心特性,它是一个基于Spring框架的快速开发工具,简化...

    RabbitMQ_的Confirm机制优化1

    RabbitMQ 的 Confirm 机制是生产者与 RabbitMQ 之间的确认机制,确保消息确实投递到了 MQ。在许多对可靠性要求比较高的应用场景下都需要使用该机制确保消息不丢。Confirm 机制的性能对应用的影响很大,测试显示,在...

    rabbitmq发送&接收消息

    RabbitMQ支持消息确认机制,确保消息被正确处理。生产者可以设置等待消费者确认,如果消费者没有确认,RabbitMQ可能会重新投递消息。 **8. 持久化** 为了防止消息丢失,RabbitMQ提供了消息和Queue的持久化选项。...

    rabbitmq + spring boot demo 消息确认、持久化、备用交换机、死信交换机等代码

    1. **消息确认**:在RabbitMQ中,消息确认(Message Acknowledgement)是一种确保消息被正确处理的机制。当消费者接收到消息后,它需要发送一个确认信号给RabbitMQ,表明消息已被处理。如果RabbitMQ没有收到确认,它...

    RabbitMQ消息列队

    5. **确认机制**: 消费者可以开启消息确认,告知RabbitMQ消息是否已被正确处理,确保消息不丢失。 ### 三、RabbitMQ的使用场景 1. **解耦**: 当系统各部分之间通过RabbitMQ通信,可以降低耦合度,增加系统的可扩展...

    RabbitMq+springboot

    总结来说,"RabbitMq+springboot"的示例项目涵盖了RabbitMQ的三种主要工作模式,消息确认机制,消息重发机制,以及如何在SpringBoot应用中使用这些特性。对于开发者而言,这个项目提供了一个实用的学习资源,帮助...

    基于Springboot 版本 2.3.2.RELEASE版本开发的Example

    Springboot 2.3.2.RELEASE的 Example共计有9个,可...RabbitMQ的五种消息发送模式,RabbitMQ消息确认机制,RabbitMQ死信队列的定义,RabbitMQ消息100%可靠性方案实践,定时任务3种方式:@Scheduled,Quartz,XXL-JOB。

    RabbitMq消息队列指南.docx

    6. 消费者接收消息后,通过ack(确认)机制通知RabbitMQ消息已被处理。 RabbitMQ还支持多种消息模型,如基本模型、Direct、Fanout、Topic和Header等,以适应不同场景的需求。例如,基本模型是最简单的,生产者直接...

Global site tag (gtag.js) - Google Analytics