`
maosheng
  • 浏览: 568130 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

MQ消息持久化和确认机制

MQ 
阅读更多
问题分析

一般MQ中间件为了提高系统的吞吐量会把消息保存在内存中,如果不作其他处理,MQ服务器一旦宕机,消息将全部丢失。这个是业务不允许的,造成很大的影响。我知道一个方法就是把消息持久化,RabbitMQ中发消息的时候会有个durable参数可以设置,设置为true,就会持久化。

这样的话MQ服务器即使宕机,重启后磁盘文件中有消息的存储,这样就不会丢失了吧。是的这样就一定概率的保障了消息不丢失。但还会有个场景,就是消息刚刚保存到MQ内存中,但还没有来得及更新到磁盘文件中,突然宕机了。这个场景在持续的大量消息投递的过程中,会很常见。

上面问题出现在,没有人告诉我们持久化是否成功。好在很多MQ有回调通知的特性,RabbitMQ就有confirm机制来通知我们是否持久化成功。

confirm机制的原理:

消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息给生产者 2、如果消息接收不成功,MQ会返回一个nack消息给生产者

这样是不是就可以保障100%消息不丢失了呢?

我们看一下confirm的机制,试想一下,如果我们生产者每发一条消息,都要MQ持久化到磁盘中,然后再发起ack或nack的回调。这样的话是不是我们MQ的吞吐量很不高,因为每次都要把消息持久化到磁盘中。写入磁盘这个动作是很慢的。这个在高并发场景下是不能够接受的,吞吐量太低了。

所以MQ持久化磁盘真实的实现,是通过异步调用处理的,他是有一定的机制,如:等到有几千条消息的时候,会一次性的刷盘到磁盘上面。而不是每来一条消息,就刷盘一次。

所以comfirm机制其实是一个异步监听的机制,是为了保证系统的高吞吐量,这样就导致了还是不能够100%保障消息不丢失,因为即使加上了confirm机制,消息在MQ内存中还没有刷盘到磁盘就宕机了,还是没法处理。

说了这么多,还是没法确保,那怎么办呢???

消息提前持久化 + 定时任务重发

其实本质的原因是无法确定是否持久化?那我们是不是可以自己让消息持久化呢?答案是可以的,我们的方案再一步的演化。

1、服务生产者在投递消息之前,先把消息持久化到Redis或DB中,建议redis,高性能。消息的状态为发送中。
2、confirm机制监听消息是否发送成功?如ack成功消息,删除redis中此消息。
3、如果nack不成功的消息,这个可以根据自身的业务选择是否重发此消息。也可以删除此消息,由自己的业务决定。
4、生产端加了个定时任务,来拉取隔一定时间了,消息状态还是为发送中的,这个状态就表明,服务生产者是没有收到ack成功消息。
5、定时任务会作补偿性的投递消息。这个时候如果MQ回调ack成功接收了,再把redis中此消息删除。

这样的机制其实就是一个补偿机制,我不管MQ有没有真正的接收到,只要我的redis中的消息状态也是为【发送中】,就表示此消息没有正确成功投递。再启动定时任务去监控,发起补偿投递。

当然定时任务那边我们还可以加上一个补偿的次数,如果大于5次,还是没有收到ack消息,那就直接把消息的状态设置为【失败】,由人工去排查到底是为什么?

这样的话方案就比较完美了,保障了100%的消息不丢失(当然不包含磁盘也坏了,可以做主从方案)。

不过这样的方案,就会有可能发送多次相同的消息,很有可能MQ已经收到了消息,就是ack消息回调时出现网络故障,没有让生产者收到。

那就要要求消费者一定在消费的时候保障幂等性。至于什么是幂等性,如何设计幂等?

幂等含义:
我们先了解一下什么叫幂等?在分布式应用中,幂等是非常重要的,也就是相同条件下对一个业务的操作,不管操作多少次,结果都是一样。

由来背景:
为什么要有幂等这种场景?因为在大的系统中,都是分布式部署,如:订单业务 和 库存业务 有可能都是独立部署的,都是单独的服务。用户下订单,会调用到订单服务和库存服务。


因为分布式部署,很有可能在调用库存服务时,因为网络等原因,订单服务调用失败,但其实库存服务已经处理完成,只是返回给订单服务处理结果时出现了异常。这个时候一般系统会作补偿方案,也就是订单服务再此放起库存服务的调用,库存减1:

update t_goods set count = count -1 where good_id=2

这样就出现了问题,其实上一次调用已经减了1,只是订单服务没有收到处理结果。现在又调用一次,又要减1,这样就不符合业务了,多扣了。

幂等这个概念就是,不管库存服务在相同条件下调用几次,处理结果都一样。这样才能保证补偿方案的可行性。

消息消费幂等去重方案:

1)乐观锁方案:
借鉴数据库的乐观锁机制,如:

update t_goods set count = count -1 , version = version + 1 where good_id=2 and version = 1

根据version版本,也就是在操作库存前先获取当前商品的version版本号,然后操作的时候带上此version号。我们梳理下,我们第一次操作库存时,得到version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传如的version还是1,再执行上面的sql语句时,就不会执行;因为version已经变为2了,where条件就不成立。这样就保证了不管调用几次,只会真正的处理一次。

2)唯一ID + 指纹码方案:
原理就是利用数据库主键去重,业务完成后插入主键标识

select count(1) from t_check where ID=唯一ID + 指纹码
1、唯一ID就是业务表的唯一的主键,如商品ID 2、指纹码就是为了区别每次正常操作的码,每次操作时生成指纹码;可以用时间戳+业务编号的方式。

上面的sql语句:

1、返回如果为0 表示没有操作过,那业务操作后就可以insert into t_check(唯一ID+指纹码) 2、返回如果大于0 表示操作过,就直接返回

好处:实现简单

坏处:高并发下数据库瓶颈

解决方案:根据ID进行分库分表进行算法路由

3)redis原子操作方案:
利用Redis的原子操作,做个操作完成的标记。这个性能就比较好。但会遇到一些问题。

第一:我们是否需要把业务结果进行数据落库,如果落库,关键解决的问题时数据库和redis操作如何做到原子性?

这个意思就是库存减1了,但redis进行操作完成标记时,失败了怎么办?也就是一定要保证落库和redis 要么一起成功,要么一起失败。

第二:如果不进行落库,那么都存储到缓存中,如何设置定时同步策略?

这个意思就是库存减1,不落库,直接先操作redis操作完成标记,然后由另外的同步服务进行库存落库,这个就是增加了系统复杂性,而且同步策略如何设置

以上就是,如何保障消息成功投递给MQ中间件,和如何设计幂等相关的解决方案。

RabbitMQ确认机制

消息丢失分为发送丢失和消费者处理丢失,相应的也有两种确认机制。

一:确认种类
RabbitMQ的消息确认有两种。

一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。

第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。

二:消息发送确认
(1)ConfirmCallback

通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调。

使用该功能需要开启确认,spring-boot中配置如下:

spring.rabbitmq.publisher-confirms = true

(2)ReturnCallback

通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)

使用该功能需要开启确认,spring-boot中配置如下:

spring.rabbitmq.publisher-returns = true

三:消息接收确认

确认模式:

AcknowledgeMode.NONE:不确认
AcknowledgeMode.AUTO:自动确认
AcknowledgeMode.MANUAL:手动确认








分享到:
评论

相关推荐

    rabbitmq + spring boot demo 消息确认、持久化、备用交换机、死信交换机等代码

    以上就是使用RabbitMQ与Spring Boot结合实现消息确认、消息持久化、备用交换机和死信交换机的详细步骤。在实际开发中,可以根据项目需求调整配置,确保系统的稳定性和可靠性。通过song-mq和song-mq-client这两个...

    [重要]基于Websphere MQ非持久化消息实现异步转同步

    基于Websphere MQ的非持久化消息,我们可以构建高效的异步转同步通信模型,优化系统性能的同时保证了消息的快速处理和反馈。通过理解和运用WMQ的特性,开发者可以更好地设计和实现分布式系统的通信架构,提升系统的...

    MQ 消息推送

    RabbitMQ是一个开源的消息代理和队列服务器,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高可用性、可扩展性和消息持久化的能力。 首先,我们要理解MQ(Message Queue)的核心概念。消息队列是一...

    Java 消息中间件MQ面试题

    解决消息队列的延时和过期失效问题的方法包括使用消息队列的持久化机制、使用消息队列的集群机制、使用消息队列的负载均衡机制等。 7. 设计MQ思路为什么使用MQ? 设计MQ思路的原因包括异步处理、应用解耦、流量削...

    MQ消息队列

    尽管MQ有持久化机制,但在特定情况下仍可能出现消息丢失,需要设计合理的消息确认策略。 6.2 消息积压 当消费者处理速度慢于生产者时,可能会导致消息积压,需要优化消费者性能或调整队列策略。 6.3 性能监控与...

    MQ客户端消息发送接收程序

    综上所述,"MQ客户端消息发送接收程序"涉及了消息队列的核心概念和使用技巧,包括客户端接口、消息模型、确认机制、事务处理、系统设计以及编程实践。这个程序的实现对于学习和理解MQ通信机制具有很高的价值。

    持久化性能测试1

    测试涵盖了不同参数的组合,如生产者和消费者的数量、MQ节点数、确认机制(Ack)、消息持久化、磁盘类型以及消息包的大小。 在标签中,“rabbitmq”和“性能测试”表明这是关于RabbitMQ的性能测试,主要关注其处理...

    MQ,websphere mq

    2. **WebSphere MQ架构**:掌握WebSphere MQ的组件和工作原理,包括队列管理器、通道、队列和消息。 3. **编程接口**:了解如何使用各种编程语言(如Java的JMS API,C的API等)与WebSphere MQ交互。 4. **管理和...

    信也科技MQ消息系统.zip

    2.3 消息持久化:为防止数据丢失,MQ系统通常会将消息存储到磁盘,即使在服务宕机后,也能恢复未处理的消息。 2.4 服务质量(QoS)保证:MQ系统通过设置消息确认机制,确保消息至少被消费一次(At-Least-Once)或...

    RabbitMQ_Mirror机制分析

    RabbitMQ Mirror机制是RabbitMQ中的一种高可用性机制,旨在提供消息队列的高可用性和持久化。Mirror机制的核心是镜像队列(Mirror Queue),它是一个特殊的Backing Queue,内部包裹了一个普通的Backing Queue,用于...

    activityMQ

    在使用非持久化消息时,尤其需要注意消息的及时处理,避免内存溢出导致的写文件操作,影响系统的性能和消息的发送速度。 持久化消息虽然可靠性高,但在某些情况下可能会导致性能瓶颈,特别是消息的存储和检索速度...

    MQ最佳实践_MQ_

    - **容错机制**: MQ提供消息持久化和重试策略,确保消息不丢失。 - **事务支持**: 支持事务型消息,保证消息的原子性和一致性。 3. **MQ的设计原则** - **幂等性**: 消费者处理同一消息多次应产生相同结果。 - ...

    MQ技术文档学习

    - **持久化**:在系统故障后能够恢复未处理的消息,避免数据丢失。 - **负载均衡**:在多消费者环境下,能够均匀分配消息,避免某一个消费者过载。 - **高可用性**:通过集群和复制等手段,确保MQ服务的不间断运行。...

    15-消息中间件MQ面试题.docx

    RabbitMQ可以通过设置消息的确认机制和幂等性来保证消息的可靠传输。 为什么不应该对所有的message都使用持久化机制? RabbitMQ可以根据需要选择是否使用持久化机制,以避免影响系统性能。 如何保证高可用的? ...

    MQ 相关介绍PPT

    消息可以大致分成两部分:应用数据体和消息数据头。消息数据头是对消息属性的描述,这段信息往往被队列管理器用来确定对消息的处理。消息可以分成持久(Persistent)消息和非持久(Non-Persistent)消息。 概念:...

    JAVA发送MQ信息

    此外,还要注意MQ的其他特性,如交换器类型(Direct、Fanout、Topic、Header等)、路由键、消息确认机制(publisher confirms或consumer acknowledgments)以及队列的持久化策略,这些都是设计高效、可靠的MQ系统时...

    c#MQ开发和所需DLL文件

    - **持久化**:为确保消息在服务器重启后仍然可用,可以配置队列和消息的持久化选项。 - **并发控制**:在多线程环境中,合理管理和控制并发以防止数据竞争。 - **负载均衡**:如果有多台MQ服务器,可以通过负载均衡...

    MQ.rar_MQ_mq文件

    9. **故障恢复与高可用**:MQ的高可用设计包括主备切换、集群部署、消息持久化等,以保证服务在节点故障时仍能正常运行,且不丢失已提交的消息。 10. **最佳实践**:在实际应用中,遵循最佳实践可以避免常见问题,...

    MQ-A级面试题.pdf

    3. 设置消息持久化,通过将队列和消息都设置为持久化,确保即使在RabbitMQ宕机的情况下,消息也不会丢失。但需要注意的是,持久化消息在写入磁盘前如果RabbitMQ突然宕机,仍然可能会丢失。 4. 消费者方面,可以通过...

Global site tag (gtag.js) - Google Analytics