`
sheungxin
  • 浏览: 105841 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

RabbitMQ使用场景练习:延迟队列(八)

    博客分类:
  • MQ
阅读更多
  • 延时队列
     在实际业务场景中可能会用到延时消息发送,例如支付场景,准时支付、超过未支付将执行不同的方案,其中超时未支付可以看做一个延时消息。
     RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL(Time To Live)、DLX(Dead Letter Exchanges)特性实现。其原理给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,变向实现延时队列。利用RabbitMQ的这种特性,应该可以实现很多现实中的业务,我们可以发挥想象。
     rabbitmq-delayed-message-exchange,我们也可以使用插件来实现延时队列。利用TTL、DLX实现的延时队列可以中断,使用插件实现的延时队列是否可以中断?留着下次。。。

  • 注意要点

为每一条消息设置过期时间
Builder properties=new BasicProperties.Builder();
//指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准
properties.expiration("12000");//延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列)
channel.basicPublish("header_exchange", "" ,properties.build(), SerializationUtils.serialize(object));

在队列上设置队列过期时间(可以不用设置)、消息过期时间、过期消息转发规则
//设置消息过期时间为12秒,消息过期转发给指定转发器、匹配的routingkey(可不指定)
Map<String, Object> args=new HashMap<String, Object>();
args.put("x-expires", 30000);//队列过期时间
args.put("x-message-ttl", 12000);//队列上消息过期时间,应小于队列过期时间
args.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
args.put("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey

消息没有consumer消费才会过期,所以接收消息类中consumer需要注释掉

队列上设置消息过期时间和消息上设置消息过期时间,优先级以较小的为准

队列上设置消息过期时间和消息上设置消息过期时间,后者过期消息有可能不会及时删除,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列,因此消费时判断是否过期

  • 发送消息类

package com.demo.mq.rabbitmq.example08;

import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang3.SerializationUtils;

import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

/**
 * 发送消息类
 * @author sheungxin
 *
 */
public class Send{

	/**
	 * 在topic转发器的基础上练习延时转发,发送消息时指定消息过期时间
	 * 消息已发送到queue上,但未有consumer进行消费
	 * @param object 消息主体
	 * @throws IOException
	 */
	public static void sendAToB(Serializable object) throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		//声明headers转发器
		channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);
		//定义headers存储的键值对
		Map<String, Object> headers=new HashMap<String, Object>();
		headers.put("key", "123456");
		headers.put("token", "654321");
		//把键值对放在properties
		Builder properties=new BasicProperties.Builder();
		properties.headers(headers);
		properties.deliveryMode(2);//持久化
		//指定消息过期时间为12秒,队列上也可以指定消息的过期时间,两者以较小时间为准
//		properties.expiration("12000");//延时12秒,不会及时删除(在consuemr消费时判定是否过期,因为每条消息的过期时间不一致,删除过期消息就需要扫描整个队列)
		channel.basicPublish("header_exchange", "" ,properties.build(), SerializationUtils.serialize(object));
		System.out.println("Send '"+object+"'");
		channel.close();
		conn.close();
	}
	
	public static void main(String[] args) throws Exception {
		sendAToB("Hello World !");
	}
}

  • 接收消息类

package com.demo.mq.rabbitmq.example08;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang3.SerializationUtils;

import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 接收消息类
 * @author sheungxin
 *
 */
public class Recv {
	
	/**
	 * 在topic转发器的基础上练习延时转发,设置队列过期时间(过期后自动删除),过期消息处理策略(转发给相匹配的queue)
	 * 实验时启动接收类创建队列后,关闭该线程,使其进入未使用状态
	 * @throws Exception
	 */
	public static void recvAToB() throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		channel.exchangeDeclare("header_exchange", BuiltinExchangeType.HEADERS);
		//设置队列过期时间为30秒,消息过期转发给指定转发器、匹配的routingkey(可不指定)
		Map<String, Object> args=new HashMap<String, Object>();
		args.put("x-expires", 30000);//队列过期时间
		args.put("x-message-ttl", 12000);//队列上消息过期时间
		args.put("x-dead-letter-exchange", "exchange-direct");//过期消息转向路由
		args.put("x-dead-letter-routing-key", "routing-delay");//过期消息转向路由相匹配routingkey
		//创建一个临时队列
		String queueName=channel.queueDeclare("tmp01",false,false,false,args).getQueue();
		//指定headers的匹配类型(all、any)、键值对
		Map<String, Object> headers=new HashMap<String, Object>();
		headers.put("x-match", "all");//all any(只要有一个键值对匹配即可)
		headers.put("key", "123456");
//		headers.put("token", "6543211");
		//绑定临时队列和转发器header_exchange
		channel.queueBind(queueName, "header_exchange", "", headers);
		System.out.println("Received ...");
		Consumer consumer=new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
				String mes=SerializationUtils.deserialize(body);
				System.out.println(envelope.getRoutingKey()+":Received :'"+mes+"' done");
				channel.basicAck(envelope.getDeliveryTag(), false);
			}
		};
		//关闭自动应答机制,默认开启;这时候需要手动进行应该
		channel.basicConsume(queueName, false, consumer);
	}
	
	public static void main(String[] args) throws Exception {
		recvAToB();
	}

}

  • 延时消息处理类

package com.demo.mq.rabbitmq.example08;

import java.io.IOException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 延时消息处理类
 * @author sheungxin
 *
 */
public class DelayRecv {

	/**
	 * 创建队列并声明consumer用于处理转发过来的延时消息
	 * @throws Exception
	 */
	public static void delayRecv() throws Exception{
		Connection conn=MqManager.newConnection();
		Channel channel=conn.createChannel();
		channel.exchangeDeclare("exchange-direct", BuiltinExchangeType.DIRECT);
		String queueName=channel.queueDeclare().getQueue();
		channel.queueBind(queueName, "exchange-direct", "routing-delay");
		Consumer consumer=new DefaultConsumer(channel){
			@Override
			public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
				String mes=SerializationUtils.deserialize(body);
				System.out.println(envelope.getRoutingKey()+":delay Received :'"+mes+"' done");
			}
		};
		//关闭自动应答机制,默认开启;这时候需要手动进行应该
		channel.basicConsume(queueName, true, consumer);
	}
	
	public static void main(String[] args) throws Exception {
		delayRecv();
	}

}
0
0
分享到:
评论

相关推荐

    面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?.zip

    面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?.zip面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?.zip面试官:RabbitMQ本身不支持延迟队列,那你给我实现一个?.zip面试官:RabbitMQ本身不支持...

    使用RabbitMQ+延迟队列实现分布式事务的最终一致性方案

    在订单+库存系统的场景下,当用户下单时,我们首先将订单创建的信息发送到RabbitMQ的延迟队列,设置延迟时间如30秒。这样做的目的是让订单先创建成功,保证用户端的快速响应,而库存的扣减则延后执行。 接下来,...

    RabbitMQ实战 高效部署分布式消息队列 PDF下载

    《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,专注于帮助读者理解和掌握如何在实际项目中高效地部署和使用这个强大的消息中间件。RabbitMQ作为开源的消息代理和队列服务器,广泛应用...

    RabbitMQ实战 高效部署分布式消息队列 带目录 高清版 PDF

    接着,会探讨RabbitMQ的高级特性,如死信队列(Dead Letter Queue)、延迟队列(Delay Queue)和优先级队列(Priority Queue),这些特性可以增强系统的容错能力和灵活性。此外,还会介绍如何实现消息的持久化,确保...

    rabbitMQ延迟队列

    **正文** RabbitMQ延迟队列是一种特殊类型的...总的来说,RabbitMQ的延迟队列功能通过插件实现,为应用程序提供了灵活的时间控制,使得任务可以在预定的时间点被执行,极大地拓展了RabbitMQ在各种业务场景中的应用。

    RabbitMQ实战 高效部署分布式消息队列

    RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列

    Go语言版本rabbitmq消息队列库:simple、worker、Fanout 模型、Direct 模型、Topic模型

    RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送...

    RabbitMQ延迟队列及消息延迟推送实现详解

    延迟队列的实现方式有多种,例如,使用死信队列+TTL 过期时间来实现延迟队列,使用 RabbitMQ 官方提供的延迟队列插件等。每种方式都有其优缺点,需要根据实际情况选择合适的方式。 在使用 RabbitMQ 实现延迟队列时...

    rabbitmq_delayed_message_exchange-3.8.0 延迟队列插件

    RabbitMQ延迟队列插件,即rabbitmq_delayed_message_exchange-3.8.0,是一个针对RabbitMQ消息代理的扩展,旨在提供一种机制,使得消息能够在特定延迟后才被投递到相应的队列。这个功能在很多业务场景中非常有用,...

    rabbitmq_delayed_3.6.x延迟插件.rar

    以下是一些关于如何使用RabbitMQ延迟队列的详细步骤和示例: 1. 安装插件:将下载的`.ez`文件拷贝到RabbitMQ的`plugins`目录下,然后通过命令行执行`rabbitmq-plugins enable rabbitmq_delayed_message_exchange`来...

    RabbitMQ实战:高效部署分布式消息队列 pdf版

    - **管理控制台**:使用Web管理界面监控和管理RabbitMQ实例。 - **环境配置**:配置RabbitMQ的节点、用户权限、虚拟主机等。 **3. 生产者与消费者** - **创建生产者**:编写代码发送消息到RabbitMQ,支持多种编程...

    RabbitMQ学习实践二:MQ的安装

    RabbitMQ学习实践二:MQ的安装

    RabbitMQ3.10延迟队列插件

    RabbitMQ3.10延迟队列插件,

    rabbitmq 延迟队列插件 rabbitmq_delayed_message_exchange_3.8.17

    rabbitmq 延迟队列插件 rabbitmq_delayed_message_exchange_3.8.17 解压即用 输入命令进行安装 .\rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    RabbitMQ实战高效部署分布式消息队列

    RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议实现的开源消息队列系统,广泛应用于微服务架构、解耦系统组件以及异步任务处理等场景。 首先,我们需要理解消息队列的基本概念。消息队列是一种在...

    RabbitMQ实战:高效部署分布式消息队列

    《RabbitMQ实战:高效部署分布式消息队列》是一本专为希望深入理解和应用RabbitMQ的IT从业者准备的指南。RabbitMQ是一款开源的消息代理软件,它在分布式系统中承担着消息中间件的角色,负责处理应用程序之间的通信,...

    RabbitMQ实战 高效部署分布式消息队列pdf

    - **监控与日志**:使用RabbitMQ提供的监控工具和日志记录,了解系统运行状态。 7. **最佳实践** - **合理设计exchange和queue**:根据业务场景选择合适的exchange类型(如Direct、Fanout、Topic、Header)和绑定...

Global site tag (gtag.js) - Google Analytics