`

mq如何处理消息丢失

    博客分类:
  • MQ
 
阅读更多

一、RabbitMQ

1)生产者弄丢了数据

  生产者将数据发送到rabbitmq的时候,可能因为网络问题导致数据就在半路给搞丢了。

 

1.可以选择用rabbitmq提供的事务功能,在生产者发送数据之前开启rabbitmq事务(channel.txSelect),然后发送消息,如果消息没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就可以回滚事务(channel.txRollback),然后重试发送消息;如果收到了消息,那么可以提交事务(channel.txCommit)。但是问题是,开始rabbitmq事务机制,基本上吞吐量会下来,因为太耗性能。

2.(推荐)可以开启confirm模式,在生产者那里设置开启confirm模式之后,你每次写的消息都会分配一个唯一的id,然后如果写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。如果rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息id的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

  

  事务机制和cnofirm机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是confirm机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息rabbitmq接收了之后会异步回调你一个接口通知你这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用confirm机制的。

 

2)rabbitmq弄丢了数据

  为了防止rabbitmq自己弄丢了数据,这个你必须开启rabbitmq的持久化,就是消息写入之后会持久化到磁盘,哪怕是rabbitmq自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,rabbitmq还没持久化,自己就挂了,可能导致少量数据会丢失的,但是这个概率较小。

  设置持久化有两个步骤,第一个是创建queue的时候将其设置为持久化的,这样就可以保证rabbitmq持久化queue的元数据,但是不会持久化queue里的数据;第二个是发送消息的时候将消息的deliveryMode设置为2,就是将消息设置为持久化的,此时rabbitmq就会将消息持久化到磁盘上去。必须要同时设置这两个持久化才行,rabbitmq哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个queue里的数据。

  而且持久化可以跟生产者那边的confirm机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者ack了,所以哪怕是在持久化到磁盘之前,rabbitmq挂了,数据丢了,生产者收不到ack,你也是可以自己重发的。

  若生产者那边的confirm机制未开启的情况下,哪怕是你给rabbitmq开启了持久化机制,也有一种可能,就是这个消息写到了rabbitmq中,但是还没来得及持久化到磁盘上,结果不巧,此时rabbitmq挂了,就会导致内存里的一点点数据会丢失。

3)消费端弄丢了数据

  主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,rabbitmq认为你都消费了,这数据就丢了。

  这个时候得用rabbitmq提供的ack机制,简单来说,就是你关闭rabbitmq自动ack,可以通过一个api来调用就行,然后每次你自己代码里确保处理完的时候,再程序里ack一把。这样的话,如果你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的。

 

二、Kafka

1)消费者弄丢了数据

  唯一可能导致消费者弄丢数据的情况,就是说,你那个消费到了这个消息,然后消费者那边自动提交了offset,让kafka以为你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你自己就挂了,此时这条消息就丢咯。

  大家都知道kafka会自动提交offset,那么只要关闭自动提交offset,在处理完之后自己手动提交offset,就可以保证数据不会丢。但是此时确实还是会重复消费,比如你刚处理完,还没提交offset,结果自己挂了,此时肯定会重复消费一次,自己保证幂等性就好了。

 

2)Kafka弄丢了数据

  比较常见的一个场景,就是kafka某个broker宕机,然后重新选举partiton的leader时。大家想想,要是此时其他的follower刚好还有些数据没有同步,结果此时leader挂了,然后选举某个follower成leader之后,他不就少了一些数据?这就丢了一些数据啊。

所以此时一般是要求起码设置如下4个参数:

1.给这个topic设置replication.factor参数:这个值必须大于1,要求每个partition必须有至少2个副本

2.在kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系,没掉队,这样才能确保leader挂了还有一个follower吧

3.在producer端设置acks=all:这个是要求每条数据,必须是写入所有replica之后,才能认为是写成功了

4.在producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了

  生产环境就按照上述要求配置的,这样配置之后,至少在kafka broker端就可以保证在leader所在broker发生故障,进行leader切换时,数据不会丢失

3)生产者弄丢了数据

 

  按照上述的思路设置了ack=all,一定不会丢,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

分享到:
评论

相关推荐

    mq 消息丢失机制解读与处理

    mq 保证消息不丢失 的场景和处理

    IBM WebSphere MQ消息持久化

    IBM WebSphere MQ是IBM公司开发的消息中间件产品,它能够保证...在一些对性能要求特别高的系统中,可能会选择使用非持久消息以提高效率,但必须设计相应的错误处理和业务补偿机制来处理非持久消息可能出现的丢失问题。

    MQ发送消息源代码

    MQ提供了一种可靠的传输机制,确保即使在生产者或消费者出现问题时,消息也不会丢失。 在Java中,我们可以使用IBM提供的JMS(Java Message Service)API来与IBMMQ交互。JMS是Java平台上的一个标准接口,用于访问...

    MQ客户端消息发送接收程序

    - 确认(ACK):确保消息已被正确处理,防止丢失或重复。 - 模式如:自动ACK、手动ACK等。 5. **事务支持**: - 支持事务性消息,保证消息发送与业务操作的原子性。 6. **高可用和高可靠性**: - 集群部署:多...

    1 消息队列MQ+多线程任务+业务处理

    在描述中提到的“监听优化消息”,可能指的是MQ客户端会持续监控队列中的新消息,一旦有新的消息到来,就会触发相应的处理逻辑。这有助于及时响应系统中的变化,例如数据库更新、用户操作等。同时,通过“根据电脑...

    mq demo 消息 自写demo

    - **容错**:如果消费者故障,消息不会丢失,它们会在队列中等待恢复后的消费者处理。 - **扩展性**:添加更多消费者可以轻松地处理更大负载,提高系统扩展性。 4. **运行和调试Demo**: - 确保你已安装了必要的...

    MQ 消息推送

    4. **消息持久化**:为了防止服务重启导致消息丢失,我们可以选择将消息和队列设置为持久化的,即使MQ服务器重启,消息也不会丢失。 5. **错误处理**:可能包含异常捕获、重试机制以及死信队列的使用,以处理消息...

    XXL-MQ是一款轻量级分布式消息队列支持串行并行和广播等多种消息模型

    3. **高可用**:XXL-MQ采用主从复制的模式,确保即使主节点故障,消息也能被备份节点接管,避免数据丢失。 4. **消息重试与死信队列**:当消息消费失败时,XXL-MQ支持自动重试机制,并且提供死信队列,将无法处理的...

    mq消息中间件.rar

    - **HA模式**:启用HA插件,实现队列的自动复制,保证在节点故障时不会丢失消息。 5. **SET化架构设计** - **Service(服务)**:模块化业务逻辑,每个服务独立部署和扩展。 - **Event(事件)**:使用消息作为...

    MQ消息序号Message Sequence详解

    在MQ中,消息序号起着至关重要的作用,它不仅帮助系统追踪消息的状态,还确保了消息处理的顺序性。 #### 消息序号的生成与作用 在MQ中,每个发送到队列的消息都会被分配一个唯一的序列号——Message Sequence ...

    阿里云 专有云企业版 V3.8.1 消息队列 MQ 产品简介 20190916

    * 消息处理:消息队列 MQ 产品可以处理大量的消息数据,提供高效的消息处理能力。 * 分布式系统:消息队列 MQ 产品可以用于分布式系统中,提供高效的消息队列服务。 * 云计算:消息队列 MQ 产品可以用于云计算平台中...

    [重要]基于Websphere MQ持久化消息实现异步转同步—方案二

    标题中的“基于Websphere MQ持久化消息实现异步转同步—方案二”是指在分布式系统中,通过使用Websphere MQ(WebSphere Message Broker,一种消息中间件)来处理异步通信,并通过消息的持久化特性,确保消息在异常...

    IBMMQ消息序号-顺序

    MQ中的消息序号(Message Sequence Number)是一项重要的机制,用于确保消息在传输过程中不会丢失也不会重复发送。具体来说: - **定义**:消息序号是由发送通道为每条消息分配的一个序列号,这个号码会随着每条消息...

    mq、jms消息处理jar包

    错误处理也是关键,例如处理连接中断、消息丢失等情况。博客中展示的相关错误信息可以作为问题排查的参考,帮助其他开发者识别并解决类似问题。 总之,理解IBM MQ和JMS的工作原理,以及如何正确配置和使用它们的...

    MQ消息队列

    通过消息持久化,即使服务发生故障,消息也不会丢失,系统可以恢复后继续处理。 3.4 解耦合 MQ减少了组件之间的直接依赖,系统可以独立开发和升级,降低维护复杂性。 四、MQ消息队列应用 4.1 任务调度 在大数据...

    IBM MQ 应答队列

    COA通常表示消息已经被接收并正常处理,而COD则可能在消息无法送达或者接收方出现问题时被发送,表明消息可能已经丢失。 报告类型(Report Type)是IBM MQ中设置消息属性的一部分,用于指定希望收到何种类型的回执...

    MQ单项接发消息

    5. **确认与应答**:消费者处理完消息后,通常会向MQ服务器发送确认信息,表明消息已被成功处理,服务器则可以安全地删除该消息。 三、MQ的主要优点 1. **解耦**:生产者和消费者无需直接交互,降低了系统的耦合度...

    MQ 消息机制

    1. **生产者与消费者模型**:在ActiveMQ中,生产者创建并发送消息到消息队列,而消费者则从队列中接收并处理这些消息。生产者和消费者可以是完全独立的,它们之间的交互通过消息进行,降低了两者之间的耦合度。 2. ...

    MQ,websphere mq

    6. **交易支持**:WebSphere MQ支持X/Open分布式事务处理(DTP),确保消息在事务中的正确处理。 **MQ电子书的学习价值** 学习MQ和WebSphere MQ的相关书籍可以帮助我们深入理解以下关键概念: 1. **消息队列模型*...

Global site tag (gtag.js) - Google Analytics