- 浏览: 106674 次
- 性别:
- 来自: 北京
-
文章分类
最新评论
-
sheungxin:
@仑山中鸟 @zl_xzl 不好意思,代码已丢。。。自己动手试 ...
自定义类加载器与spring的集成 -
昆仑山中鸟:
同求代码,看的云里雾里。自定义监听器WebContextLis ...
自定义类加载器与spring的集成 -
xzl_xzl:
代码都是片段,能否提供下具体的代码,正好我也有类似的需求。但我 ...
自定义类加载器与spring的集成 -
sheungxin:
@add2ws 我试下,多谢!
基于oracle的增量数据采集实现总结 -
add2ws:
用得着这么麻烦么,直接用kettle插入更新,配合oracle ...
基于oracle的增量数据采集实现总结
- 延时队列
RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。
rabbitmq-delayed-message-exchange,我们也可以使用插件来实现延时队列。利用TTL、DLX实现的延时队列可以中断,使用插件实现的延时队列是否可以中断?留着下次。。。
- 注意要点
为每一条消息设置过期时间:
Builder properties=new BasicProperties.Builder(); //指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准 properties.expiration("12000");//延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列) channel.basicPublish("header_exchange", "" ,properties.build(), SerializationUtils.serialize(object));
在队列上设置队列过期时间(可以不用设置)、消息过期时间、过期消息转发规则:
//设置消息过期时间为12秒,消息过期转发给指定转发器、匹配的routingkey(可不指定) Map<String, Object> args=new HashMap<String, Object>(); args.put("x-expires", 30000);//队列过期时间 args.put("x-message-ttl", 12000);//队列上消息过期时间,应小于队列过期时间 args.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由 args.put("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey
消息没有consumer消费才会过期,所以接收消息类中consumer需要注释掉
队列上设置消息过期时间和消息上设置消息过期时间,优先级以较小的为准
队列上设置消息过期时间和消息上设置消息过期时间,后者过期消息有可能不会及时删除,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列,因此消费时判断是否过期
- 发送消息类
package com.demo.mq.rabbitmq.example08; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.AMQP.BasicProperties.Builder; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; /** * 发送消息类 * @author sheungxin * */ public class Send{ /** * 在topic转发器的基础上练习延时转发,发送消息时指定消息过期时间 * 消息已发送到queue上,但未有consumer进行消费 * @param object 消息主体 * @throws IOException */ public static void sendAToB(Serializable object) throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); //声明headers转发器 channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS); //定义headers存储的键值对 Map<String, Object> headers=new HashMap<String, Object>(); headers.put("key", "123456"); headers.put("token", "654321"); //把键值对放在properties Builder properties=new BasicProperties.Builder(); properties.headers(headers); properties.deliveryMode(2);//持久化 //指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准 // properties.expiration("12000");//延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列) channel.basicPublish("header_exchange", "" ,properties.build(), SerializationUtils.serialize(object)); System.out.println("Send '"+object+"'"); channel.close(); conn.close(); } public static void main(String[] args) throws Exception { sendAToB("Hello World !"); } }
- 接收消息类
package com.demo.mq.rabbitmq.example08; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 接收消息类 * @author sheungxin * */ public class Recv { /** * 在topic转发器的基础上练习延时转发,设置队列过期时间(过期后自动删除),过期消息处理策略(转发给相匹配的queue) * 实验时启动接收类创建队列后,关闭该线程,使其进入未使用状态 * @throws Exception */ public static void recvAToB() throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS); //设置队列过期时间为30秒,消息过期转发给指定转发器、匹配的routingkey(可不指定) Map<String, Object> args=new HashMap<String, Object>(); args.put("x-expires", 30000);//队列过期时间 args.put("x-message-ttl", 12000);//队列上消息过期时间 args.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由 args.put("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey //创建一个临时队列 String queueName=channel.queueDeclare("tmp01",false,false,false,args).getQueue(); //指定headers的匹配类型(all、any)、键值对 Map<String, Object> headers=new HashMap<String, Object>(); headers.put("x-match", "all");//all any(只要有一个键值对匹配即可) headers.put("key", "123456"); // headers.put("token", "6543211"); //绑定临时队列和转发器header_exchange channel.queueBind(queueName, "header_exchange", "", headers); System.out.println("Received ..."); Consumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); System.out.println(envelope.getRoutingKey()+":Received :'"+mes+"' done"); channel.basicAck(envelope.getDeliveryTag(), false); } }; //关闭自动应答机制,默认开启;这时候需要手动进行应该 channel.basicConsume(queueName, false, consumer); } public static void main(String[] args) throws Exception { recvAToB(); } }
- 延时消息处理类
package com.demo.mq.rabbitmq.example08; import java.io.IOException; import org.apache.commons.lang3.SerializationUtils; import com.demo.mq.rabbitmq.MqManager; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; /** * 延时消息处理类 * @author sheungxin * */ public class DelayRecv { /** * 创建队列并声明consumer用于处理转发过来的延时消息 * @throws Exception */ public static void delayRecv() throws Exception{ Connection conn=MqManager.newConnection(); Channel channel=conn.createChannel(); channel.exchangeDeclare("exchange-direct", BuiltinExchangeType.DIRECT); String queueName=channel.queueDeclare().getQueue(); channel.queueBind(queueName, "exchange-direct", "routing-delay"); Consumer consumer=new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{ String mes=SerializationUtils.deserialize(body); System.out.println(envelope.getRoutingKey()+":delay Received :'"+mes+"' done"); } }; //关闭自动应答机制,默认开启;这时候需要手动进行应该 channel.basicConsume(queueName, true, consumer); } public static void main(String[] args) throws Exception { delayRecv(); } }
发表评论
-
RabbitMQ使用场景练习:STOMP plugin
2016-12-22 10:32 4178STOMP plugin Stomp是一个简单的消息 ... -
RabbitMQ使用场景练习:Validated User ID、Length Limit(十二 )
2016-12-16 15:14 1399Validated User ID 发送消息时指定userid ... -
RabbitMQ使用场景练习:消息确认机制(十一)
2016-12-16 12:08 5712消息确认机制 RabbitMQ提供了transaction、c ... -
RabbitMQ使用场景练习:优先级队列(十)
2016-12-14 16:05 3648优先级队列 只有当消费者不足,不能及时进行消费的情况 ... -
RabbitMQ使用场景练习:监听器Listener(九)
2016-12-13 21:23 8067监听器 RabbitMQ中监听器有ReturnLis ... -
RabbitMQ使用总结:持久化
2016-12-13 10:29 2046持久化 RabbitMQ的 ... -
RabbitMQ使用场景练习:RPC(七)
2016-12-08 17:51 2667RPC,同步消息 RabbitMQ默认的consum ... -
RabbitMQ使用场景练习:Headers(六)
2016-12-08 15:28 3067Headers转发器 消息发送时可以在header中 ... -
RabbitMQ使用场景练习:主题Topic(五)
2016-12-06 17:26 2455主题转发器(Topic) Topic转发器的功效包含 ... -
RabbitMQ使用场景练习:路由选择Routing(四)
2016-12-06 17:02 842路由选择(Routing) Routing即按照某条 ... -
RabbitMQ使用场景练习:发布/订阅(三)
2016-12-05 17:30 1302发布/订阅 即实现单点发送消息,多点接收。使用fan ... -
RabbitMQ使用场景练习:工作队列(二)
2016-12-05 17:11 1204工作队列 工作队列的好处在于多个工作线程共享执行任务 ... -
RabbitMQ使用场景练习:入门实例(一)
2016-12-05 16:20 1696注意要点 同一队列多次创建://此处声明队列为了防止接收者 ... -
RabbitMQ的管理与监控
2016-11-30 17:08 1640开启management plugin功能 1、managem ... -
RabbitMQ的安装
2016-11-30 16:02 8151、RabbitMQ安装 官网下载地址:http://www ...
相关推荐
此外,还可以利用RabbitMQ的高级特性,如工作队列、延迟消息、死信队列等,来优化系统设计。 在项目`itcast-rabbitmq`中,你可以找到这些示例的完整实现,以及可能包含的多模块、多场景的实践代码。通过学习和实践...
5. **队列设计**:引入消息队列(如RabbitMQ、Kafka),将大量请求异步化处理,缓解服务器压力,避免瞬间高并发对系统造成冲击。同时,消息队列能实现流量削峰填谷,保证系统稳定。 6. **分布式Id生成**:秒杀场景...
4. **异步处理**:对于耗时操作(如生成订单、发送邮件),可以使用消息队列(如RabbitMQ或Kafka)实现异步处理,避免阻塞主线程,提高系统吞吐量。 5. **数据一致性与事务处理**:在高并发环境下,保证数据一致性...
在《专高2_练习手册_高性能架构_第09单元1》中,我们...通过这些练习题,学习者可以加深对Kafka和Redis的理解,掌握它们在大数据环境中的应用场景和核心特性。这些技术对于构建高性能、高可用的数据处理架构至关重要。
【MQ(Message Queue)消息队列】是一种中间件技术,用于在分布式系统中解耦应用程序组件,通过异步处理提高系统的响应速度和可扩展性。它允许不同服务之间通过发送和接收消息进行通信,而无需直接调用对方,从而...
10. **RabbitMQ消息中间件面试专题及答案.pdf**:RabbitMQ是消息队列的流行实现,资料将介绍其工作原理、交换器类型、队列绑定等,帮助开发者掌握异步通信和解耦设计。 这些资料全面覆盖了Java开发中的关键知识点,...
通过RabbitMQ或Kafka等消息队列,taotaoMarket可以实现异步处理,如订单生成后的通知发送、后台任务的调度等,优化了系统性能并降低了延迟。 8. **分布式事务处理** 在分布式环境中,事务一致性是挑战之一。项目...
7. 系统架构与分布式:微服务、负载均衡、CAP理论、分布式一致性(如Paxos和Raft协议)、分布式缓存(如Redis)以及消息队列(如RabbitMQ或Kafka)等概念和技术。 8. 软件工程:理解敏捷开发、Scrum框架、版本控制...