`

RabbitMQ实现延时队列

 
阅读更多

1. 背景介绍

实际开发中,存在着下面这些场景:

  • 滴滴打车订单完成后,如果用户一直不评价,48小时后自动五星好评
  • 在电商系统中,一个用户下单之后通常有30分钟的时间进行支付,如果30分钟之内没有支付成功,则这个订单会进行后续一些处理
  • 用户希望通过手机远程遥控家里的智能设备在指定的时间后进行工作
  • ......

针对这些场景,常见的方案是:启动一个cron定时任务,定时运行并查询符合时间条件的数据并进行处理。该方案存在以下几点不足:

  • 轮询效率比较低
  • 每次扫库,已经被执行过的记录,仍然会被扫描(不会出现在结果集中),有重复计算的嫌疑,若数据量过大对数据库也有压力
  • 时效性不够,如果轮询时间间隔较长,时间误差比较大
  • 若为了降低时间误差而提高轮询频率,则1、2问题更加凸显

显然这并不是一个明智之举,下面介绍通过延时队列实现。

 

2. 延时队列

延时队列存储的对象是对应的延时消息,所谓 延时消息 是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

 

Java提供delayedQueue可以实现本地的延时队列,但利用delayedQueue只能实现单机版,而且保存在内存中,需要在宕机时、消息消费异常时做相应的逻辑处理,非常麻烦。

 

3. 利用RabbitMQ实现延时队列功能

RabbitMQ本身没有直接支持延迟队列功能,但是可以通过RabbitMQ的两个特性来曲线实现延迟队列:Time To Live(TTL) 和 Dead Letter Exchanges(DLX),结合Time To Live(TTL) 和 Dead Letter Exchanges(DLX)两个特性,就可以模拟出延时消息的功能。

 

Time To Live(TTL)

RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter。

 

RabbitMQ针对队列中的消息过期时间有两种方法可以设置:

①Per-Message TTL:通过队列属性设置,队列中所有消息都有相同的过期时间。
②Queue TTL:对消息进行单独设置,每条消息的TTL可以不同。

如果同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就成为dead letter。

 

 

// java client声明队列时,统一设置该队列中的消息过期时间
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("myqueue", false, false, false, args);

// java client发送一条只能驻留60秒的消息到队列(设置单条消息过期时间)
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "routing-key", properties, messageBodyBytes);

 

// java client设置队列的过期时间
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);

 

 

 

Dead Letter Exchanges(DLX)

RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由。

  • x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
  • x-dead-letter-routing-key:指定routing-key发送

队列出现dead letter的情况有:

  • 消息或者队列的TTL过期
  • 队列达到最大长度
  • 消息被消费端拒绝(basic.reject or basic.nack)并且requeue=false

利用DLX,当消息在一个队列中变成死信后,它能被重新publish到另一个Exchange。这时候消息就可以重新被消费。

 

channel.exchangeDeclare("some.exchange.name", "direct");

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
// args.put("x-dead-letter-routing-key", "some-routing-key");
channel.queueDeclare("myqueue", false, false, false, args);

 

 

 

4. 利用RabbitMQ实现延时队列功能

在rabbitmq 3.5.7及以上的版本提供了一个插件(rabbitmq-delayed-message-exchange)来实现延迟队列功能。同时该插件依赖Erlang/OPT 18.0及以上。

 

插件源码地址:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

 

插件下载地址:

https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

 

插件安装及启用

  • 进入插件安装目录:{rabbitmq-server}/plugins/
  • 下载插件
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

 

  • 启用/关闭插件
(启用插件)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

(关闭插件)
rabbitmq-plugins disable rabbitmq_delayed_message_exchange

 

  • 插件使用:通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性(x-delayed-message是插件提供的类型,并不是rabbitmq本身的),发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间
// ... elided code ...
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
// ... more code ...


// ... elided code ...
byte[] messageBodyBytes = "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
// ... more code ...

 

 

插件使用示例

 

import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    // 队列名称
    private final static String QUEUE_NAME = "delay_queue";
    private final static String EXCHANGE_NAME="delay_exchange";

    public static void main(String[] argv) throws Exception,
            java.lang.InterruptedException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.queueDeclare(QUEUE_NAME, true,false,false,null);
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
        channel.basicConsume(QUEUE_NAME, true, queueingConsumer);
        SimpleDateFormat sf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        try {
            System.out.println("****************WAIT***************");
            while(true){
                QueueingConsumer.Delivery delivery = queueingConsumer
                        .nextDelivery(); //

                String message = (new String(delivery.getBody()));
                System.out.println("message:"+message);
                System.out.println("now:\t"+sf.format(new Date()));
            }

        } catch (Exception exception) {
            exception.printStackTrace();

        }

    }
}

 

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
    // 队列名称
    private final static String EXCHANGE_NAME="delay_exchange";
    private final static String ROUTING_KEY="key_delay";

    @SuppressWarnings("deprecation")
    public static void main(String[] argv) throws Exception {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.12.190");
        factory.setUsername("admin");
        factory.setPassword("admin");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

        // 声明x-delayed-type类型的exchange
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", true,
                false, args);


        Map<String, Object> headers = new HashMap<String, Object>();
        //设置在2016/11/04,16:45:12向消费端推送本条消息
        Date now = new Date();
        Date timeToPublish = new Date("2016/11/04,16:45:12");

        String readyToPushContent = "publish at " + sf.format(now)
                + " \t deliver at " + sf.format(timeToPublish);

        headers.put("x-delay", timeToPublish.getTime() - now.getTime());

        AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder()
                .headers(headers);
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, props.build(),
                readyToPushContent.getBytes());

        // 关闭频道和连接
        channel.close();
        connection.close();
    }
}

 

启动接收端,启动发送端,运行结果如下:

****************WAIT***************
message:publish at 2018-08-12 16:44:16.887   deliver at 2018-08-12 16:45:12.000
now:    2018-08-12 16:45:12.023

 

注意:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量不可见,不影响正常功能使用。

 

注意:使用过程中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会无法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据造成启动超时,并建议不要使用Ram节点。

 

参考

  1. 用rebbitMq来实现你的延迟队列功能
  2. rabbitmq 实现延迟队列的两种方式
分享到:
评论

相关推荐

    springboot+rabbitmq实现延时队列

    本教程将详细介绍如何使用SpringBoot集成RabbitMQ来实现一个延时队列,并探讨消息发送与消费确认机制以及消费者端的策略模式应用。 首先,SpringBoot是Java开发者广泛使用的快速开发框架,它简化了Spring的配置和...

    springboot整合RabbitMQ实现延时队列的两种方式 教程及源码

    springboot整合RabbitMQ实现延时队列的两种方式 教程及源码。参考博客:https://blog.csdn.net/qq_29914837/article/details/94070677

    高效延时队列的设计与实现

    常见的延时队列实现方案包括: 1. 轮询数据库:定期查询数据库中即将到期的延时任务,但这可能导致大量无效查询,效率较低。 2. 使用JDK自带的DelayQueue:这是一个无界阻塞队列,元素需实现Delayed接口,当延迟时间...

    Linux 安装 RabbitMQ 应用 / RabbitMQ 延时队列

    Linux 安装 RabbitMQ 应用 / RabbitMQ 延时队列

    SpringBoot集成RabbitMQ延时队列,自定义延时时间Demo

    该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...

    使用RabbitMQ+延迟队列实现分布式事务的最终一致性方案

    通过这种方式,我们能够在保证用户体验的同时,利用RabbitMQ的延迟队列实现分布式事务的最终一致性。这种方案适用于对即时一致性要求不高,但对系统稳定性和可用性要求较高的场景。不过,需要注意的是,虽然延迟队列...

    RabbitMQ延迟队列及消息延迟推送实现详解

    RabbitMQ 延迟队列及消息延迟推送实现详解 RabbitMQ 延迟队列及消息延迟推送实现详解是指在 RabbitMQ 中实现消息延迟推送的功能,即在指定的时间后推送消息到目标队列中。这种机制在实际应用中非常有价值,如在淘宝...

    rabbitmq延时队列和四种交换机模式下队列的简单实现

    在本项目中,我们将探讨如何实现RabbitMQ的延时队列以及在四种不同的交换机模式下的队列配置。 首先,让我们理解RabbitMQ的延时队列。在实际业务场景中,有时我们需要在特定时间后才处理某些消息,例如订单超时取消...

    RabbitMQ+Erlang+RabbitMq延时队列插件

    RabbitMQ本身并不直接支持延时队列,但可以通过安装插件来实现这一功能。`rabbitmq_delayed_message_exchange`插件就是为此目的而设计的。 安装`rabbitmq_delayed_message_exchange`插件,首先需要确保你已经安装了...

    【ASP.NET编程知识】运用.NetCore实例讲解RabbitMQ死信队列,延时队列.docx

    ASP.NET 编程知识 - RabbitMQ 死信队列和延时队列实践 以下是ASP.NET编程知识中关于RabbitMQ死信队列和延时队列的知识点总结: 一、死信队列 死信队列是一个特殊的队列,用于存储不能被消费的消息。这些消息可能...

    rabbitMq+erlang+延时队列插件完整安装包.7z

    5. **安装延时队列插件**:RabbitMQ的延时队列功能并非默认内置,而是通过一个名为`rabbitmq_delayed_message_exchange`的插件提供。使用以下命令启用该插件: ``` rabbitmq-plugins enable rabbitmq_delayed_...

    RabbitMQ3.10延迟队列插件

    RabbitMQ3.10延迟队列插件,

    如何通过Python实现RabbitMQ延迟队列

    因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列。功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live...

    rabbitmq_delayed_message_exchange-3.8.0.ez RabittMq及延时队列插件

    RabbitMQ的延时队列插件提供了一种便捷的方式来实现这个功能,而无需自己编写复杂的调度逻辑。 **安装延时队列插件** 要在RabbitMQ中使用延时队列,首先需要安装插件。这通常可以通过以下命令完成(假设你已经在你...

    支付状态一致性-RabbitMQ死信队列

    支付状态一致性-RabbitMQ死信队列

    rabbitMQ延迟队列

    **正文** RabbitMQ延迟队列是一种特殊类型的...总的来说,RabbitMQ的延迟队列功能通过插件实现,为应用程序提供了灵活的时间控制,使得任务可以在预定的时间点被执行,极大地拓展了RabbitMQ在各种业务场景中的应用。

    rabbitmq_delayed_3.6.x延迟插件.rar

    描述中的“rabbitmq3.6.x延迟队列插件下载”提示我们这个压缩包包含的是RabbitMQ 3.6.x版本的延迟队列插件安装文件,用户可以下载并安装到自己的RabbitMQ环境中,以实现延迟消息的功能。 标签“rabbitmq插件”和...

    Redis延时消息队列基于swoole实现的多进程消费端

    在Redis中,可以利用Sorted Set数据结构来实现延时队列。每个消息都被赋予一个时间戳作为score,然后添加到Sorted Set中。这样,当score对应的时间到来时,就可以通过ZRANGEBYSCORE命令获取并处理这些消息。 接下来...

Global site tag (gtag.js) - Google Analytics