`

spring boot rabbitMQ延时队列 实现

阅读更多

spring boot 使用rabbitMQ
需要导入相关的相关依赖如下:

 

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

AMQP默认实现为rabbitMQ。

 

为了定制化的配置放弃了boot的自动配置,So,以下为RabbitMQ的配置文件:

 

package com.fengbaogu.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq 的配置类
 * @author LiaoLe
 */
@Configuration
public class RabbitMQConfig {
    /** 消息交换机的名字*/
    public static final String EXCHANGE = "delayed_exchange";
    /** 队列key1*/
    public static final String ROUTING = "notify";
    /** 队列名*/
    public static final String QUEUE = "delayed_queue";
    /** rabbit MQ 账号*/
    public static final String USERNAME = "liaoke";
    /**rabbit MQ 密码*/
    public static final String PASSWORD = "198461lk";
    /**rabbit MQ 地址*/
    public static final String RABBITMQ_ADDRESS="localhost";
    /**rabbit MQ 端口*/
    public static final Integer RABBITMQ_PORT=5672;

    /**
     * 配置链接
     * @return
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(RABBITMQ_ADDRESS,RABBITMQ_PORT);
        connectionFactory.setUsername(USERNAME);
        connectionFactory.setPassword(PASSWORD);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true); /**需要设置 接受回调信息*/
        return connectionFactory;
    }


    /**
     * 交换机设置
     * 开启了延时消息
     * @return
     */
    @Bean
    public DirectExchange defaultExchange() {
        DirectExchange directExchange = new DirectExchange(EXCHANGE, true, false);
        directExchange.setDelayed(true);/**只需简单一步开启延时消息,md找了好久,原来这么简单*/
        return directExchange;
    }

    /**
     * 队列设置
     * @return
     */
    @Bean
    public Queue notifyQueue() {
        return new Queue(QUEUE,true);/**消息持久化*/
    }

    /**
     * 绑定队列到交换机
     * @return
     */
    @Bean
    public Binding bindingNotify() {
        return BindingBuilder.bind(notifyQueue()).to(defaultExchange()).with(ROUTING);
    }

}

 rabbitMQ默认是不开启delayed功能的哟,所以同学们还需要到官网或者githup上面下载插件:

 

命令行: rabbitmq-plugins enable rabbitmq_delayed_message_exchange
 

#安装插件 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange 

 

#插件是有版本要求的哦

 

下面贴发送的代码,随便用了个http入口:

package com.fengbaogu.web;

import com.fengbaogu.config.RabbitMQConfig;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

/**
 * Created by LiaoKe on 2017/5/2.
 */
@RestController
public class TestController implements RabbitTemplate.ConfirmCallback{


    /**
     * 使用RabbitTemplate发送消息
     */
    private RabbitTemplate rabbitTemplate;

    /**
     * 构造方法
     * @param rabbitTemplate
     */
    public TestController(RabbitTemplate rabbitTemplate){
        this.rabbitTemplate = rabbitTemplate;
        //设置消费回调
        this.rabbitTemplate.setConfirmCallback(this);
    }

    private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");


    /**
     * 向延迟队列中发送消息
     * @param msg
     * @return
     */
    @RequestMapping("send")
    public String send3(String msg){
        String uuid = UUID.randomUUID().toString();
        CorrelationData correlationId = new CorrelationData(uuid);
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE,RabbitMQConfig.ROUTING , msg, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setDelay(20000);/**延时20秒发送*/
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);/**消息持久化*/
                        System.out.println(sdf.format(new Date()) + " Delay sent.");
                        return message;
                    }
                }, correlationId);
        return null;
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println(" 回调id:" + correlationData);
        if (ack) {
            System.out.println("消息成功消费");
        } else {
            System.out.println("消息消费失败:" + cause+"\n重新发送");
        }
    }
}

 

 

启动后可以在RabbitMQ管理界面上有:



 这样的交换机。

Queue同理也可以在管理界面上看到,此处不贴图了。

 

现在来看 Consumer 端,代码异常简洁:

package com.fengbaogu.listener;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;


/**
 * Created by LiaoKe on 2017/5/3.
 */
@Component
@RabbitListener(queues = {"delayed_queue"})
public class Receiver {


    @RabbitHandler
    public void process(String msg)  {
        System.out.println("start one work and now data:"+new Date().toString());
        System.out.println("This msg is:"+msg);
    }



}

开启消费端,访问send入口:

服务端打印:

2017-05-03 20:24:14 Delay sent.

 回调id:CorrelationData [id=277ce017-af15-4571-b556-762fec62720d]

消息成功消费

消费端打印:

start one work and now data:Wed May 03 20:24:34 CST 2017

This msg is:我是消息2

 

So,相差20秒。延时队列Over。

 

 

 

  • 大小: 5.5 KB
分享到:
评论
1 楼 liaoke0123 2017-05-04  
https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange/v3.6.x


附上官方插件下载地址

相关推荐

    SpringBoot集成RabbitMQ延时队列,自定义延时时间Demo

    该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...

    Spring Boot RabbitMQ 延迟消息实现完整版

    ### Spring Boot RabbitMQ 实现延迟消息详解 #### 引言 在现代分布式系统设计中,消息中间件(如RabbitMQ)被广泛用于处理异步通信、解耦服务以及优化性能等方面。其中,延迟消息是一种重要的功能,它可以实现在...

    rabbitmq延时队列和四种交换机模式下队列的简单实现

    在本项目中,我们将探讨如何实现RabbitMQ的延时队列以及在四种不同的交换机模式下的队列配置。 首先,让我们理解RabbitMQ的延时队列。在实际业务场景中,有时我们需要在特定时间后才处理某些消息,例如订单超时取消...

    springboot+rabbitmq实现延时队列

    本教程将详细介绍如何使用SpringBoot集成RabbitMQ来实现一个延时队列,并探讨消息发送与消费确认机制以及消费者端的策略模式应用。 首先,SpringBoot是Java开发者广泛使用的快速开发框架,它简化了Spring的配置和...

    Rabbitmq延迟队列实现定时任务的方法

    RabbitMQ 延迟队列实现定时任务的方法 RabbitMQ 延迟队列是一种实现定时任务的方法,它可以帮助我们在指定的时间点执行某个任务。这种方法可以应用在很多场景中,例如:在电子商城中,需要在指定的时间点关闭订单、...

    使用RabbitMQ死信实现延迟消息

    spring boot 配置, Rabbitmq集成, 利用死信 实现延时消息队列实现

    详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    在本文中,我们将探讨如何使用Spring Cloud Stream和RabbitMQ的延迟消息功能来实现开始时间不确定的定时任务。 首先,RabbitMQ 提供了一个名为 `rabbitmq_delayed_message_exchange` 的插件,用于创建延迟消息。...

    ActiveMQ-讲义.pdf

    熟悉原生JMS API是掌握ActiveMQ的基础,而Spring和Spring Boot的整合则让ActiveMQ与现代Java应用更加无缝对接。此外,了解ActiveMQ消息的组成和高级特性也是构建健壮消息系统的必要条件。 在面试时,通常会遇到一些...

    二维码扫描红包

    6. 消息队列:在高并发环境下,可以引入RabbitMQ、Kafka等消息队列技术,缓解服务器压力,保证红包发放的稳定性和延时一致性。 综上所述,二维码扫描红包功能涉及多方面的技术,包括二维码编码解码、前后端开发、...

    新闻文章自动新闻采集系统-webapps

    这部分可能采用Spring Boot框架构建,结合Thymeleaf或React等前端库,提供用户友好的界面,让用户可以查看实时更新的新闻、定制关注的新闻源,甚至进行搜索和筛选操作。 总的来说,【新闻文章自动新闻采集系统-...

    基于B/S的校园网上在线拍卖系统的

    - 消息队列:RabbitMQ或Kafka用于异步处理,如发送通知。 - 部署环境:Docker容器化部署,配合Nginx作为反向代理和负载均衡。 【开发流程】: 1. 需求分析:明确系统功能和目标用户,编写需求文档。 2. 设计阶段:...

    SpiderServer:爬虫服务器,从页面中填写调度配置,根据配置解析抓取流程完成抓取

    界面的实现离不开HTML、CSS和JavaScript,后端则使用Java的Spring Boot或Spring MVC框架来处理HTTP请求和响应,将用户的配置保存到数据库中。 2. **爬虫调度** 调度器是SpiderServer的关键组件,它负责解析用户...

Global site tag (gtag.js) - Google Analytics