大白话讲解RabbitMQ消息可靠传输保障 消息确认机制 死信队列
深入分析RabbitMQ消息异常处理,及延迟队列在缓存架构中的应用
延迟队列
顾名思义,延迟队列是指被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。
RabbitMQ延迟队列,主要是借助消息的TTL(Time to Live)和死信exchange(Dead Letter Exchanges:DLX)来实现。
应用场景分析
1)延迟消费
在订单系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,那么这个订单将进行异常处理(如:关闭订单),这时就可以使用延迟队列来处理这些订单
还有其它一些常用的应用场景,如:物联网中的延时处理、新注册用户的活跃度调查……
2)延迟重试
主要是处理一些异常信息,如:发送失败、消费失败,可能当时由于网络抖动的问题,暂时无法访问,需要延迟再进行重试(重新发送,重新消费)
如果不使用延迟队列,那么我们只能采用定时任务,轮训数据库,方法简单好用,但性能底下,在高并发情况下容易弄死数据库,间隔时间不好设置,时间过大,影响精度,过小影响性能,而且做不到按超时的时间顺序处理。
既然延迟队列这么好,我们怎样来实现延迟队列呢,本文将深入讲解延迟队列的实现原理,并实战演练延迟队列的应用
核心概念
1)死信队列DLX
DLX,Dead Letter Exchange 的缩写,又死信邮箱、死信交换机。DLX就是一个普通的交换机,和一般的交换机没有任何区别。
【当消息在一个队列中变成死信(dead message)时,通过这个交换机将死信发送到死信队列中】(指定好相关参数,rabbitmq会自动发送)。
什么是死信呢?什么样的消息会变成死信呢?
消息被拒绝(basic.reject或basic.nack)并且requeue=false.
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq中)
应用场景分析:
在定义业务队列的时候,可以考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便我们查看消息失败的原因了
如何使用死信交换机呢?
定义业务(普通)队列的时候指定参数:
x-dead-letter-exchange: 用来设置死信后发送的交换机
x-dead-letter-routing-key:用来设置死信的routingKey
2)消息TTL/队列TTL(Time To Live 过期时间)
概述
消息TTL/队列TTL(Time To Live 过期时间)
队列TTL:通过队列的属性来设置TTL,队列中的所有消息都有相同的过期时间
消息TTL:对消息进行单独设置,每条消息的TTL可以设置不同的过期时间
两者同时使用:以较小的TTL为准
消息在队列中的生存时间一旦超过设置的TTL,就会变成"死信"(Dead Message),如果绑定了死信队列,这些死信变会进入死信队列
消息在队列的生存时间一旦超过设置的TTL值,就称为dead message, 消费者将无法再收到该消息。
队列TTL与消息TTL的区别:
队列TTL:
一旦消息过期,就会立即从队列中抹去(因为过期的消息肯定处于队列的头部)
消息TTL:
即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的
为什么两者得处理方法不一致?
因为第一种方法里,队列中已过期的消息肯定在队列头部,RabbitMQ只要定期从队头开始扫描是否有过期消息即可,而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息,势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期,再进行删除。
Queue TTL
queue.declare 命令中的 x-expires 参数控制 queue 被自动删除前可以处于未使用状态的时间。未使用的意思是 queue 上没有任何 consumer ,queue 没有被重新声明,并且在过期时间段内未调用过 basic.get 命令。该方式可用于,例如,RPC-style 的回复 queue, 其中许多 queue 会被创建出来,但是却从未被使用。
服务器会确保在过期时间到达后 queue 被删除,但是不保证删除的动作有多么的及时。在服务器重启后,持久化的 queue 的超时时间将重新计算。
用于表示超期时间的 x-expires 参数值以毫秒为单位,并且服从和 x-message-ttl 一样的约束条件,且不能设置为 0 。所以,如果该参数设置为 1000 ,则表示该 queue 如果在 1s之内未被使用则会被删除。
下面的 Java 示例创建了一个 queue ,其会在 30 分钟不使用的情况下判定为超时。
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-expires", 1800000); channel.queueDeclare("myqueue", false, false, false, args);
总结:
死信队列 + 消息TTL = 延迟队列
1)延迟消费
生产者发送消息到Queue_10s
设置Queue_10s的队列TTL为10s
当消息过期后,经过DLX,进入Dead_Queue
消费者消费Dead_Queue中的消息
2)延迟重试
生产者发送消息到普通队列,消费者正常消费队列
生产者发送失败,发送消息到Queue_20s,消息过期后,经过DLX,进入Dead_Queue,最为数据流重新发送
消费者消费失败,发送消息到Queue_20s,消息过期后,经过DLX,进入Queue,消费者正常消费
相关推荐
该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...
如果失败,可设置重试机制。 4. **异常处理与补偿机制**:当库存服务长时间无法正常工作时,可以设定补偿机制,如回滚未完成的订单或者手动介入解决。 5. **监控与报警**:设置监控系统,实时关注订单和库存服务的...
本教程将详细介绍如何使用SpringBoot集成RabbitMQ来实现一个延时队列,并探讨消息发送与消费确认机制以及消费者端的策略模式应用。 首先,SpringBoot是Java开发者广泛使用的快速开发框架,它简化了Spring的配置和...
- **延时队列模拟**:RabbitMQ不直接支持延时队列,但可以通过设置消息TTL和DLX来模拟。消费者应监听与DLX绑定的队列,而非原始消息队列。 4. **死信队列使用流程** - **创建DLX和关联队列**:首先,创建一个死信...
1. **RabbitMQ延时队列插件**:这个插件提供了新的交换机类型——`x-delayed-message`,它可以将消息发送到队列时附带一个延迟时间。当这个时间到达时,消息才会出现在队列中,可供消费者消费。 2. **SpringBoot...
4. **故障恢复**:在系统出现故障时,可以利用延时队列重新尝试执行失败的操作,避免立即重试导致的雪崩效应。 总之,RabbitMQ的延时队列插件为开发者提供了一种简单且灵活的方式来处理需要延时处理的任务,使得...
为了实现延时重试,我们可以使用RabbitMQ的`x-delayed-message`插件,它可以创建延迟队列。这样,当我们把消息放到重试队列时,可以附加一个延迟时间戳,确保消息在指定时间后才被消费。 ```php // 在发布到重试...
2.延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于
由于延时队列在某些应用场景下的特殊需求,传统的消息中间件如RabbitMQ和Kafka虽然功能强大但操作复杂,这时Redis这种轻量级的存储系统成为了一种快速实现延时队列的选择。 Redis提供了几种数据结构可以用来实现...
在本文中,我们将介绍 RabbitMQ 双活架构设计的实现,包括消息 Publish 可靠性、消息 Consume 可靠性、消息延迟重试机制和消息延时消费等。 消息 Publish 可靠性 在 RabbitMQ 中,消息 Publish 可靠性是通过确认...
3. **消息重试**:多次尝试发送,如果失败则等待一段时间后再试。 4. **定时任务**:比如定期清理日志、执行报表生成等。 需要注意的是,延时插件并不保证消息的精确延迟,因为RabbitMQ本身并不提供精确的定时器...
4. **消息重试**:在消息处理失败后,延迟一段时间后再重试,降低系统压力。 **注意事项** 1. 消息延迟功能依赖于RabbitMQ的内部实现,可能存在延迟精度问题,不适用于需要精确计时的场景。 2. 延迟消息可能导致...
9. **死信队列**:掌握死信队列的概念,当消息无法正常路由或达到最大重试次数时,会被送到死信队列,用于排查问题和保证数据安全。 10. **监控与管理**:通过RabbitMQ的Web管理界面,学习如何监控队列状态、查看...
在某些场景下,我们需要实现延迟任务,比如订单超时处理、任务重试等,RabbitMQ提供了相应的功能来支持这些需求。本文将深入探讨如何利用RabbitMQ的TTL(Time To Live)和Dead Letter Exchange特性来实现延迟任务。 ...
3. **重试机制**:对于可能失败的操作,可以先延迟投递,然后在一定时间后重试,避免频繁失败对系统的影响。 **监控与管理** 为了确保延迟消息功能的正常运行,我们需要监控RabbitMQ的性能和队列状态。可以使用...
在本文中,我们将深入探讨如何使用Java编程语言与RabbitMQ进行交互,特别是实现一个发布订阅模式下的延时重试队列。RabbitMQ是一个开源的消息代理和队列服务器,它使得应用程序之间能够异步传递消息,提高了系统的可...
- **接口重试:** 接口调用失败后,可设定间隔时间重试直至达到阈值。 **四、延迟任务的技术实现方案** - **1. 使用JDK自带的`DelayQueue`:** - **原理:** `DelayQueue` 是一个支持延时获取元素的阻塞队列,内部...
这时,需要设计合理的错误处理机制,如设置重试次数、设置重试间隔、记录错误日志等。对于无法立即解决的错误,任务可以被标记为失败并存储相关信息,待后续人工介入或系统恢复后再处理。 7. **资源释放与任务关闭*...