RabbitMQ中监听器有ReturnListener、ConfirmListener、ShutdownListener,本练习中使用ReturnListener,在发布消息时设置mandatory等于true,监听消息是否有相匹配的队列,没有时ReturnListener将执行handleReturn方法,消息将返给发送者
设置mandatory=true,当路由不到队列时返回给消息发送者,在return监听器中接收
设置immediate=true,当路由不到消费者时返回,3.0以后版本已废弃,会影响镜像队列性能,建议采用消息TTL和DLX
发送消息时mandatory设置为true:
//设置mandatory=true,当路由不到队列时返回给消息发送者,在return监听器中接收
channel.basicPublish("header_exchange", "" ,true,false,properties.build(), SerializationUtils.serialize(object));
创建Return监听器:
//增加return监听器,当发布消息且无匹配的队列时消息被返回给接收者
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, BasicProperties properties, byte[] body)
throws IOException {
System.out.println(SerializationUtils.deserialize(body));
}
});
除了Return监听器,还有ConfirmListener、ShutdownListener监听器
package com.demo.mq.rabbitmq.example09;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ReturnListener;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
/**
* mandatory、监听器使用练习
* @author sheungxin
*
*/
public class SendListener {
/**
* 1、设置mandatory=true,当路由不到队列时返回给消息发送者,在return监听器中接收
* 2、immediate,当路由不到消费者时返回,3.0以后版本已废弃,会影响镜像队列性能,建议采用消息TTL和DLX
* 3、监听器:ReturnListener:mandatory=true时,无匹配queue时接收返回消息
* ConfirmListener:Ack、Nack,confirm模式,服务端监听
* ShutdownListener:监听关闭
* @param object 消息主体
* @throws IOException
*/
public static void sendAToB(Serializable object) throws Exception{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
//声明headers转发器
channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);
//定义headers存储的键值对
Map<String, Object> headers=new HashMap<String, Object>();
headers.put("key", "123456");
headers.put("token", "654321");
//把键值对放在properties
Builder properties=new BasicProperties.Builder();
properties.headers(headers);
properties.deliveryMode(2);//持久化
//指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准
// properties.expiration("12000");//延时30秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列)
//增加return监听器,当发布消息且无匹配的队列时消息被返回给接收者
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, BasicProperties properties, byte[] body)
throws IOException {
System.out.println(SerializationUtils.deserialize(body));
}
});
//设置mandatory=true,当路由不到队列时返回给消息发送者,在return监听器中接收
channel.basicPublish("header_exchange", "" ,true,false,properties.build(), SerializationUtils.serialize(object));
System.out.println("Send '"+object+"'");
}
public static void main(String[] args) throws Exception {
sendAToB("Hello World !");
}
}
分享到:
相关推荐
4. 定义消息监听器(MessageListener)或使用@RabbitListener注解监听队列。 5. 设计消息实体类和消息转换策略。 6. 实现业务逻辑,如发送消息到队列或处理接收到的消息。 通过以上步骤,我们可以将RabbitMQ高效地...
7. **启动监听**:确保你的`RabbitListenerContainerFactory`已正确配置,这样Spring才能自动启动监听器容器。通常,这会自动完成,但如果需要自定义,可以在配置类中创建一个工厂: ```java @Bean public ...
通过声明监听器容器(`SimpleMessageListenerContainer`)和消息监听器接口(`MessageListener`)实现消息的异步消费。 在这个练习项目中,你可能会看到以下代码结构: - 配置类(`RabbitConfig.java`):设置...
3. **监听器配置**:确保`XSApplyIdListener`作为消息监听器的配置是正确的,例如使用`<rabbit:listener-container>`或`@RabbitListener`。 4. **依赖注入**:检查`@Autowired`注解的使用,确保bean能被正确注入。...
6. **Message Listener**:为了接收消息,你需要定义一个监听器,通常是实现了`RabbitListener`注解的类或方法。当RabbitMQ接收到消息时,它会自动调用这些监听器。 7. **配置**:项目中会有相应的配置文件(如`...
基于Java的MQListener消息监听器管理平台 项目简介 MQListener是一个用于解决消息消费过程中常见问题的解决方案。它支持RabbitMQ消息队列,提供了消息消费者的统一配置与管理功能,包括服务列表、listener列表、...
6. 接收消息:创建一个监听器接口,使用`@RabbitListener`注解定义队列监听器。 接下来,Kafka是一个分布式流处理平台,常用于构建实时数据管道和流应用。在Spring Cloud中集成Kafka,你需要: 1. 添加依赖:在`...
安装完成后,我们可以在JMeter的“监听器”部分看到新的“RabbitMQ Publisher”和“RabbitMQ Listener”组件。这两个组件分别用于发送消息到RabbitMQ服务器和接收来自服务器的消息,是测试RabbitMQ性能的核心工具。 ...
你可以通过实现`MessageListener`接口或使用`@RabbitListener`注解来创建消息监听器,当RabbitMQ接收到消息时,监听器会被调用。 **4. 配置RabbitMQ连接** 在Spring配置文件中,你需要配置`...
在上面的代码中,我们创建了一个RabbitMQ监听器,它会监听一个名为`mqtt-incoming`的队列,并将接收到的消息转发给`MqttMessageListener`处理。 最后,为了测试我们的配置,可以在`mq-consumer-demo`项目中启动一个...
- `listeners`: 配置监听端口和协议,如`tcp_listener`用于TCP连接。 - `default_user`和`default_password`: 设置默认的管理用户和密码。 五、RabbitMQ管理界面 RabbitMQ提供了一个Web管理界面,便于监控和管理...
4. **接收消息**:通过实现RabbitListener接口或声明消息监听器容器(MessageListenerContainer),可以创建消费者来监听Queue并处理接收到的消息。 5. **消息确认**:Spring支持自动或手动的消息确认机制,确保...
接收消息则可以通过MessageListener接口或者简化后的SimpleMessageListenerContainer实现,后者可以管理线程池和消费者,并在接收到消息时自动调用监听器的方法。 Spring还提供了对消息确认(acknowledgement)的...
它包括了模板(Template)用于发送消息,以及监听器容器(Listener Container)来处理接收的消息。此外,Spring-rabbit还支持声明Exchange、Queue和Binding,以及事务和确认模式,增强了消息的可靠传输。 **...
2. **消费者**:通过实现`MessageListener`接口或使用`@RabbitListener`注解创建监听器,处理接收到的消息。 3. **死信队列**:RabbitMQ支持死信队列,当消息无法路由或消费者处理失败时,可以将这些消息发送到特定...
创建一个监听器,使用 `@RabbitListener` 注解监听特定队列的消息。例如: ```java @RabbitListener(queues = "my_queue") public void handleMessage(Message message) { System.out.println("Received ...
4. 定义消息监听器:在Controller或单独的服务类中,创建一个实现了`MessageListener`接口的类,用于接收消息。通常,我们使用`SimpleMessageListenerContainer`来管理和调度监听器。 5. 发送消息:在需要发送消息...
3. **add_ip_to_listener**:可能涉及监听器配置的更新,添加新的IP地址或者更改监听端口以适应网络环境变化。 4. **internal_exchanges**:内部交换机的升级,确保内部消息路由机制的正确性。 5. **user_to_...
- **消息监听器**:创建一个实现了`MessageListener`接口的类,用于接收来自RabbitMQ的消息。 - **消息发送**:在MySQL数据发生变化时,如增删改操作,发送消息到RabbitMQ队列。 3. **MySQL数据库同步** - **...
- 消息监听器:实现`MessageListener`接口,处理接收到的消息。 - 消息生产者:使用`RabbitTemplate`发送消息到指定的交换机。 为了调试和解决问题,你可以使用RabbitMQ的管理界面检查队列状态,查看消息是否正确...