我们经常使用消息队列进行系统之间的解耦,日志记录等等。但是有时候我们在使用 RabbitMQ时,由于exchange、bindKey、routingKey没有设置正确,导致我们发送给交换器(exchange)的消息,由于没有正确的RoutingKey可能会存在一个消息丢失的情况,如果我们希望知道那些消息经过exchange之后,没有被正确的存入消息队列,那么应该如何进行处理。
方案一:使用 mandatory 参数配合 ReturnListener 来进行解决
方案二:使用备份交换器 (alternate exchange) 来进行解决
方案一介绍:
mandatory参数的含义:
true:表示当交换器无法根据自身的类型和路由键找到一个符合条件的队列时,那么RabbitMQ会调用 Basic.Return 命令将消息返回给生产者。生产者使用ReturnListener 来监听没有被正确路由到消息队列中的消息。
false:表示当交换器无法根据自身的类型和路由键找到一个服务条件的队列时,那么RabbitMQ会丢弃这个消息。
注意事项:
1、有时候发现即使 mandatory参数设置成 true,也没有进入 ReturnListener,那么这个可能是什么原因呢?其实这个可能是受RabbitMQ配置的内存和磁盘告警限制。(http://www.rabbitmq.com/alarms.html)
2、这是一个RabbitMQ配置的磁盘告警导致没有进入ReturnListener的例子。(http://rabbitmq.1065348.n5.nabble.com/ReturnListener-is-not-invoked-td24549.html)
示例代码:
/** * RabbitMQ 生产者 * <pre> * 1、ReturnListener 的使用。 * >> mandatory: 参数需要设置成 true , ReturnListener 才会生效。 * >> 用于获取到没有路由到消息队列中的消息。 * 2、ReturnListener 的注意事项 http://www.rabbitmq.com/alarms.html * >> 受到内存和磁盘的限制 * >> http://rabbitmq.1065348.n5.nabble.com/ReturnListener-is-not-invoked-td24549.html(一个RabbitMQ disk_free_limit 参数导致ReturnListener没有进入的例子) * * </pre> * * @author huan.fu * @date 2018/8/21 - 15:23 */ public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = "missing_routing_key"; private static final String BINDING_KEY = "bingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "140.143.237.224"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); try ( // 创建一个连接 Connection connection = connectionFactory.newConnection(); // 创建信道 Channel channel = connection.createChannel() ) { // 创建一个 type="direct"持久化、非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 创建一个 持久化、非排他的、非自动删除的交换器 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 将交换器与队列通过路由键绑定 使用 bindingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY); // 发送一条持久化消息 String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 没有被正确路由到消息队列的消息.mandatory参数设置成true"; try { // 使用 routingKey channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.err.println("消息发送完成......"); } catch (IOException e) { e.printStackTrace(); } /** * 处理生产者没有正确路由到消息队列的消息 * 这个可能不会生效:受到 rabbitmq 配置的内存和磁盘的限制 {@link http://www.rabbitmq.com/alarms.html} */ channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> { System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); System.out.println("properties:" + properties); System.out.println("body:" + new String(body, StandardCharsets.UTF_8)); }); } } }
方案二介绍:
使用方案一,我们需要自己写ReturnListener,这样业务代码就变的复杂了,那么有没有一种简单的方法呢?那就是使用 备份交换器(Alternate Exchange)
声明交换器可以在channel.exchangeDeclare的时候 添加 alternate-exchange 参数来实现,交换器的类型建议声明成 fanout 类型,因为消息被重新发送到备份交换器时的路由键和从生产者出发的路由键是一致的。
示例代码:
public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String BINDING_KEY = "bingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "140.143.237.224"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); try ( // 创建一个连接 Connection connection = connectionFactory.newConnection(); // 创建信道 Channel channel = connection.createChannel() ) { Map<String, Object> arguments = new HashMap<>(16); arguments.put("alternate-exchange", "backup-exchange"); channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, arguments); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY); // 声明一个 fanout 类型的交换器,建议此处使用 fanout 类型的交换器 channel.exchangeDeclare("backup-exchange", "fanout", true, false, null); // 消息没有被路由的之后存入的队列 channel.queueDeclare("unRoutingQueue", true, false, false, null); channel.queueBind("unRoutingQueue", "backup-exchange", ""); // 发送一条持久化消息 String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 没有被正确的路由到消息队列,此时此消息会进入 unRoutingQueue"; try { // 使用 routingKey channel.basicPublish(EXCHANGE_NAME, "not-exists-routing-key", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.err.println("消息发送完成......"); } catch (IOException e) { e.printStackTrace(); } } } }
上例图解:
相关推荐
本实例将深入探讨基于RabbitMQ的消息路由分发功能,帮助你更好地理解和应用这一关键特性。 首先,理解RabbitMQ的基本概念是至关重要的。RabbitMQ是一个实现了Advanced Message Queuing Protocol(AMQP)的开源消息...
5. **RabbitMQ工作流程**:生产者创建一个信道,发布消息到交换器,交换器根据绑定规则将消息路由到一个或多个队列,消费者可以从队列中接收消息并处理。 6. **高可用性**:RabbitMQ支持集群模式,允许多个节点组成...
- **自动确认(Auto Acknowledge)**:这是默认行为,一旦消息被发送到消费者,无论是否处理完成,RabbitMQ都会立即将其从队列中移除。这种方式可能导致数据丢失。 - **手动确认(Manual Acknowledge)**:消费者可以...
1. **消息确认**:在RabbitMQ中,消息确认(Message Acknowledgement)是一种确保消息被正确处理的机制。当消费者接收到消息后,它需要发送一个确认信号给RabbitMQ,表明消息已被处理。如果RabbitMQ没有收到确认,它...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,旨在帮助读者理解和掌握如何在实际项目中高效地运用这一强大的消息中间件。书中不仅涵盖了RabbitMQ的基础知识,还深入探讨了其在分布式...
- **Exchange**: 交换机是RabbitMQ的核心组件,根据预设的路由规则将消息分发到不同的队列。 - **Queue**: 队列是消息的临时存储区域,消费者可以从队列中获取消息,但不会永久保存。 - **Binding**: 绑定是...
RabbitMQ支持消息确认机制,确保消息被正确处理。生产者可以设置等待消费者确认,如果消费者没有确认,RabbitMQ可能会重新投递消息。 **8. 持久化** 为了防止消息丢失,RabbitMQ提供了消息和Queue的持久化选项。...
你可以通过运行这个示例,观察消息是否正确地被接收方处理。 总的来说,RabbitMQ的点对点通信模式(direct exchange)为服务间的通信提供了可靠的解决方案。它允许我们灵活地定义路由规则,实现高效、可靠的消息...
- **消息确认**:启用消费者确认机制,确保消息被正确处理。 - **死信队列**:配置死信队列处理未被消费的消息。 - **TTL与消息过期**:设置消息存活时间,自动清理过期消息。 - **监控与日志**:使用RabbitMQ...
RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议实现的开源消息队列系统,广泛应用于微服务架构、解耦系统组件以及异步任务处理等场景。 首先,我们需要理解消息队列的基本概念。消息队列是一种在...
为了确保消息被正确处理,RabbitMQ支持消息确认机制。当消费者接收到消息后,可以发送一个ack(确认)信号给服务器,表示消息已被处理。如果服务器没有收到ack,会重新发送消息,防止数据丢失。 为了实现消息补偿...
RabbitMQ,作为一款开源的消息代理和队列服务器,被广泛应用于分布式系统中,以实现异步处理、解耦组件以及提升系统性能。本篇文章将深入探讨RabbitMQ的核心概念、安装与配置、工作模式,以及如何高效地部署分布式...
在传统的直接发送模式下,如果接收方服务未启动,发送方发送的消息可能会丢失,因为它们没有被正确地路由和存储。为了解决这个问题,RabbitMQ引入了“点对点”(Point-to-Point)模型,也称为队列模型,它基于发布/...
Spring AMQP是Spring框架的一个模块,它提供了与RabbitMQ消息中间件集成的能力,使得在Java应用中处理AMQP(Advanced Message Queuing Protocol)变得更加便捷。 首先,让我们了解RabbitMQ的基本概念。RabbitMQ是一...
RabbitMQ是一款非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中高效地路由和传递消息。在实际应用中,有时我们需要发送延迟消息,即消息不是立即被消费者...
RabbitMQ作为一个开源的消息代理和队列服务器,是实现异步任务处理、微服务通信以及构建高可用系统的关键组件。通过学习本书,读者将能够构建一个全天候运行的24×7×365无间断工作环境。 首先,我们需要理解...
- 支持消息的顺序性,可通过控制保证消息被消费且仅被消费一次。 - 优秀的第三方管理工具Kafka-Manager。 - 在大数据领域的实时计算和日志采集中有着广泛的应用。 - **缺点**: - 当队列数量过多时,性能会出现...
- **发布/确认模式**:确保消息被正确路由和存储,防止数据丢失。 - **死信队列**:处理无法路由或被拒绝的消息,提供错误处理机制。 - **消息持久化**:即使RabbitMQ重启,也能保证消息不丢失。 - **集群**:多...