`
Java高级架构师
  • 浏览: 11088 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

如何才能让Spring Boot与RabbitMQ结合实现延迟队列

阅读更多

顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。

延迟队列能做什么?

延迟队列多用于需要延迟工作的场景。最常见的是以下两种场景:

延迟消费。比如:

用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。

用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。

延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。

如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。这种方案既不优雅,也不方便做成统一的服务便于开发人员使用。但是使用延迟队列的话,我们就可以轻而易举地完成。

如何实现?

别急,在下文中,我们将详细介绍如何利用 Spring Boot 加 RabbitMQ 来实现延迟队列。

实现思路

在介绍具体的实现思路之前,我们先来介绍一下RabbitMQ的两个特性,一个是Time-To-Live Extensions,另一个是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅 官方文档 。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。

消息因为设置了TTL而过期。

消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。更多资料请查阅 官方文档 。

流程图

聪明的你肯定已经想到了,如何将RabbitMQ的TTL和DLX特性结合在一起,实现一个延迟队列。

针对于上述的延迟队列的两个场景,我们分别有以下两种流程图:

延迟消费

延迟消费是延迟队列最为常用的使用模式。如下图所示,生产者产生的消息首先会进入缓冲队列(图中红色队列)。通过RabbitMQ提供的TTL扩展,这些消息会被设置过期时间,也就是延迟消费的时间。等消息过期之后,这些消息会通过配置好的DLX转发到实际消费队列(图中蓝色队列),以此达到延迟消费的效果。

延迟重试

延迟重试本质上也是延迟消费的一种,但是这种模式的结构与普通的延迟消费的流程图较为不同,所以单独拎出来介绍。

如下图所示,消费者发现该消息处理出现了异常,比如是因为网络波动引起的异常。那么如果不等待一段时间,直接就重试的话,很可能会导致在这期间内一直无法成功,造成一定的资源浪费。那么我们可以将其先放在缓冲队列中(图中红色队列),等消息经过一段的延迟时间后再次进入实际消费队列中(图中蓝色队列),此时由于已经过了“较长”的时间了,异常的一些波动通常已经恢复,这些消息可以被正常地消费。

代码实现

接下来我们将介绍如何在Spring Boot中实现基于RabbitMQ的延迟队列。我们假设读者已经拥有了Spring Boot与RabbitMQ的基本知识。如果想快速了解Spring Boot的相关基础知识,可以参考我之前写的一篇文章。

初始化工程

首先我们在Intellij中创建一个Spring Boot工程,并且添加 spring-boot-starter-amqp 扩展。

配置队列

从上述的流程图中我们可以看到,一个延迟队列的实现,需要一个缓冲队列以及一个实际的消费队列。又由于在RabbitMQ中,我们拥有两种消息过期的配置方式,所以在代码中,我们一共配置了三条队列:

1.delay_queue_per_message_ttl:TTL配置在消息上的缓冲队列。

2.delay_queue_per_queue_ttl:TTL配置在队列上的缓冲队列。

3.delay_process_queue:实际消费队列。

我们通过Java Config的方式将上述的队列配置为Bean。由于我们添加了 spring-boot-starter-amqp 扩展,Spring Boot在启动时会根据我们的配置自动创建这些队列。为了方便接下来的测试,我们将delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置为同一个,且过期的消息都会通过DLX转发到delay_process_queue。

delay_queue_per_message_ttl

首先介绍delay_queue_per_message_ttl的配置代码:

@BeanQueuedelayQueuePerMessageTTL(){returnQueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter发送到的exchange.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key.build();}

其中, x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称, x-dead-letter-routing-key 声明了这些死信在转发时携带的routing-key名称。

delay_queue_per_queue_ttl

类似地,delay_queue_per_queue_ttl的配置代码:

@BeanQueuedelayQueuePerQueueTTL(){returnQueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX.withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter携带的routing key.withArgument("x-message-ttl", QUEUE_EXPIRATION) // 设置队列的过期时间.build();}

delay_queue_per_queue_ttl队列的配置比delay_queue_per_message_ttl队列的配置多了一个x-message-ttl ,该配置用来设置队列的过期时间。

delay_process_queue

delay_process_queue的配置最为简单:

@BeanQueuedelayProcessQueue(){returnQueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME) .build();}

配置Exchange

配置DLX

首先,我们需要配置DLX,代码如下:

@BeanDirectExchangedelayExchange(){returnnew DirectExchange(DELAY_EXCHANGE_NAME);}

然后再将该DLX绑定到实际消费队列即delay_process_queue上。这样所有的死信都会通过DLX被转发到delay_process_queue:

@BeanBindingdlxBinding(Queue delayProcessQueue, DirectExchange delayExchange){returnBindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME);}

配置延迟重试所需的Exchange

从延迟重试的流程图中我们可以看到,消息处理失败之后,我们需要将消息转发到缓冲队列,所以缓冲队列也需要绑定一个Exchange。 在本例中,我们将delay_process_per_queue_ttl作为延迟重试里的缓冲队列 。具体代码是如何配置的,这里就不赘述了,大家可以查阅我 Github 中的代码。

定义消费者

我们创建一个最简单的消费者ProcessReceiver,这个消费者监听delay_process_queue队列,对于接受到的消息,他会:

1.如果消息里的消息体不等于FAIL_MESSAGE,那么他会输出消息体。

2.如果消息里的消息体恰好是FAIL_MESSAGE,那么他会模拟抛出异常,然后将该消息重定向到缓冲队列(对应延迟重试场景)。

另外,我们还需要新建一个监听容器用于存放消费者,代码如下:

@BeanSimpleMessageListenerContainerprocessContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver){ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 监听delay_process_queuecontainer.setMessageListener(new MessageListenerAdapter(processReceiver));returncontainer;}

至此,我们前置的配置代码已经全部编写完成,接下来我们需要编写测试用例来测试我们的延迟队列。

编写测试用例

延迟消费场景

首先我们编写用于测试TTL设置在消息上的测试代码。

我们借助 spring-rabbit 包下提供的RabbitTemplate类来发送消息。由于我们添加了 spring-boot-starter-amqp 扩展,Spring Boot会在初始化时自动地将RabbitTemplate当成bean加载到容器中。

解决了消息的发送问题,那么又该如何为每个消息设置TTL呢?这里我们需要借助MessagePostProcessor。MessagePostProcessor通常用来设置消息的Header以及消息的属性。我们新建一个ExpirationMessagePostProcessor类来负责设置消息的TTL属性:

/*** 设置消息的失效时间*/public class ExpirationMessagePostProcessorimplements MessagePostProcessor{ private final Long ttl; // 毫秒public ExpirationMessagePostProcessor(Long ttl){ this.ttl = ttl;} @Overridepublic Message postProcessMessage(Message message)throws AmqpException {message.getMessageProperties().setExpiration(ttl.toString()); // 设置per-message的失效时间returnmessage;}}

然后在调用RabbitTemplate的convertAndSend方法时,传入ExpirationMessagePostPorcessor即可。我们向缓冲队列中发送3条消息,过期时间依次为1秒,2秒和3秒。具体的代码如下所示:

@Testpublic voidtestDelayQueuePerMessageTTL()throws InterruptedException {ProcessReceiver.latch = new CountDownLatch(3);for(int i = 1; i <= 3; i++) { long expiration = i * 1000;rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,(Object) ("Message From delay_queue_per_message_ttl with expiration "+ expiration), new ExpirationMessagePostProcessor(expiration));}ProcessReceiver.latch.await();}

细心的朋友一定会问,为什么要在代码中加一个CountDownLatch呢?这是因为如果没有latch阻塞住测试方法的话,测试用例会直接结束,程序退出,我们就看不到消息被延迟消费的表现了。

那么类似地,测试TTL设置在队列上的代码如下:

@Testpublic voidtestDelayQueuePerQueueTTL()throws InterruptedException {ProcessReceiver.latch = new CountDownLatch(3);for(int i = 1; i <= 3; i++) {rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,"Message From delay_queue_per_queue_ttl with expiration "+ QueueConfig.QUEUE_EXPIRATION);}ProcessReceiver.latch.await();}

我们向缓冲队列中发送3条消息。理论上这3条消息会在4秒后同时过期。

延迟重试场景

我们同样还需测试延迟重试场景。

@Testpublic voidtestFailMessage()throws InterruptedException {ProcessReceiver.latch = new CountDownLatch(6);for(int i = 1; i <= 3; i++) {rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);}ProcessReceiver.latch.await();}

我们向delay_process_queue发送3条会触发FAIL的消息,理论上这3条消息会在4秒后自动重试。

查看测试结果

延迟消费场景

延迟消费的场景测试我们分为了TTL设置在消息上和TTL设置在队列上两种。首先,我们先看一下TTL设置在消息上的测试结果:


 

从上图中我们可以看到,ProcessReceiver分别经过1秒、2秒、3秒收到消息。测试结果表明消息不仅被延迟消费了,而且每条消息的延迟时间是可以被个性化设置的。TTL设置在消息上的延迟消费场景测试成功。

然后,TTL设置在队列上的测试结果如下图:


 

从上图中我们可以看到,ProcessReceiver经过了4秒的延迟之后,同时收到了3条消息。测试结果表明消息不仅被延迟消费了,同时也证明了当TTL设置在队列上的时候,消息的过期时间是固定的。TTL设置在队列上的延迟消费场景测试成功。

延迟重试场景

接下来,我们再来看一下延迟重试的测试结果:


 

ProcessReceiver首先收到了3条会触发FAIL的消息,然后将其移动到缓冲队列之后,过了4秒,又收到了刚才的那3条消息。延迟重试场景测试成功。

想学习交流HashMap,nginx、dubbo、Spring MVC,分布式、高性能高可用、redis、jvm、多线程、netty、kafka、的对于课程有兴趣加群:629740746同时也可以免费获得下面分享的视频资料


 
分享到:
评论

相关推荐

    Spring Boot与RabbitMQ结合实现延迟队列的示例

    Spring Boot与RabbitMQ结合实现延迟队列的示例 本资源摘要信息主要介绍了如何使用Spring Boot与RabbitMQ结合实现延迟队列的示例,包括延迟队列的定义、应用场景、实现思路、流程图、代码实现等方面的内容。 延迟...

    Spring Boot RabbitMQ 延迟消息实现完整版

    本文将详细介绍如何使用Spring Boot结合RabbitMQ的`rabbitmq_delayed_message_exchange`插件实现延迟消息。 #### 技术背景 1. **Spring Boot**: 是基于Spring框架的一个开源框架,用于简化Spring应用的初始搭建...

    使用RabbitMQ死信实现延迟消息

    spring boot 配置, Rabbitmq集成, 利用死信 实现延时消息队列实现

    springboot+rabbitmq实现延时队列

    1. **依赖配置**:在`pom.xml`文件中添加RabbitMQ的Spring Boot Starter依赖,确保版本与SpringBoot版本兼容。 2. **配置RabbitMQ**:在`application.yml`或`application.properties`中配置RabbitMQ服务器的连接...

    spring-boot-mq-rabbitmq 一套打通rabbitmq 打开可用 有注释

    同时,RabbitMQ提供了丰富的特性,如工作队列、发布/订阅模式、延迟队列等,这些都可以在Spring Boot中轻松实现,进一步优化系统的性能和稳定性。妈妈再也不用担心你对RabbitMQ不熟悉了,因为有了这套详尽的教程,你...

    springboot与rabbitmq结合的实战、实例项目

    在IT行业中,Spring Boot和RabbitMQ是两个非常...通过以上内容,你应该能对Spring Boot与RabbitMQ的结合有更深入的理解,并能动手创建自己的实战项目。实践是最好的老师,建议动手尝试,根据项目需求进行调整和优化。

    Rabbitmq延迟队列实现定时任务的方法

    RabbitMQ 延迟队列实现定时任务的方法 RabbitMQ 延迟队列是一种实现定时任务的方法,它可以帮助我们在指定的时间点执行某个任务。这种方法可以应用在很多场景中,例如:在电子商城中,需要在指定的时间点关闭订单、...

    SpringBoot集成RabbitMQ延时队列,自定义延时时间Demo

    该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...

    Spring Boot (5) 整合 RabbitMQ

    在本教程中,我们将深入探讨如何使用Spring Boot与RabbitMQ进行整合,以实现高效的消息队列通信。Spring Boot简化了Java应用的开发过程,而RabbitMQ则是一个流行的开源消息代理,它遵循AMQP(Advanced Message ...

    rabbitmq延迟插件.zip

    在IT领域,Spring Boot是一个非常流行的Java开发框架,它简化了构建微服务和Web应用程序的过程。...总之,RabbitMQ延迟插件结合Spring Boot提供了强大的异步延迟处理能力,极大地提高了系统的灵活性和可扩展性。

    SpringBoot集成Rabbitmq简单案例

    在本文中,我们将深入探讨如何将Spring Boot与RabbitMQ集成,实现一个简单的消息队列应用。Spring Boot是Spring框架的轻量级实现,旨在简化Java应用的开发过程,而RabbitMQ则是一个广泛使用的开源消息代理,它遵循...

    rabbitmq延时队列和四种交换机模式下队列的简单实现

    在Spring Boot中集成RabbitMQ,我们可以使用`spring-boot-starter-amqp`依赖,并在配置文件(如application.yml或application.properties)中设定相关属性,包括连接信息、交换机、队列和绑定。代码实现时,可以使用...

    基于消息队列、spring boot、websocket实现的消息推送模型源代码

    本项目提供的源代码展示了如何利用消息队列(Message Queue)、Spring Boot框架以及WebSocket技术来实现一个实时的消息推送模型。以下是关于这些关键技术的详细说明: 1. **消息队列**:消息队列是一种中间件,用于...

    Spring rabbitmq集成

    RabbitMQ 还支持许多高级特性,如死信队列、延迟队列、TTL、消息重试等,可以根据需要进行配置。 总结,Spring 和 RabbitMQ 的集成使得开发者能够轻松地在 Java 应用中实现消息队列功能,从而提升系统的可扩展性和...

    SpringBoot下RabbitMq实现定时任务

    例如,我们可以使用createDelayQueue方法来创建延迟队列,並将其绑定到延迟交换机上。 五、发送消息 当我们需要发送消息时,我们可以使用RabbitTemplate来发送消息,并设置TTL时间来延迟执行任务。 六、代码实现 ...

    springboot-rabbitMQ-websocket:springboot整合rabbitMQ和websocket,实现消息的发布和接收,并通过websocket实时推送数据到页面

    总之,Spring Boot、RabbitMQ和WebSocket的结合提供了一种强大的解决方案,可以处理高并发、低延迟的实时通信需求。在理解和学习这个项目的过程中,你不仅能深化对Spring Boot的掌握,还能深入了解消息队列和...

    spring-boot-mmanyexamples.zip

    Spring Boot与Spring MVC的结合,使得开发RESTful API变得简单。配合Swagger,可以生成API文档并进行测试。在"spring-boot-mmanyexamples"中,可能会有一个或多个关于REST服务和Swagger集成的实例。 7. **安全控制...

    RabbitMQ (三)实现延迟消息.pdf

    在Spring Boot中,可以通过RabbitMQ的Java客户端库来实现延迟消息。文档中提到了使用`RabbitConfig`配置类来配置RabbitMQ的相关组件,包括队列和交换器的创建与绑定。 - 创建死信队列和死信交换器,并进行绑定操作...

Global site tag (gtag.js) - Google Analytics