package com.anyec.mq.springRabbit; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import com.rabbitmq.client.AMQP.BasicProperties; import com.rabbitmq.client.CancelCallback; import com.rabbitmq.client.Channel; import com.rabbitmq.client.DeliverCallback; import com.rabbitmq.client.Delivery; public class DeathProducer { @Autowired static RabbitTemplate amqpTemplate; public static void main(String[] args) throws IOException, TimeoutException { RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("javamq"); factory.setPassword("javamq"); factory.setRequestedChannelMax(10000); factory.setVirtualHost("/rabbit"); // com.rabbitmq.client.ConnectionFactory fo = factory.getObject(); com.rabbitmq.client.ConnectionFactory cf = factory.getRabbitConnectionFactory(); Channel channel = cf.newConnection().createChannel(); Map<String, Object> headers = new HashMap<String, Object>(); headers.put("x-dead-letter-exchange", "orderCreateDeathExchange"); headers.put("x-message-ttl", 10000); // channel.exchangeDelete("orderCreateExchange"); // channel.exchangeDelete("orderCreateDeathExchange"); // // channel.exchangeDeclare("orderCreateExchange", "direct", true, false, null); // channel.exchangeDeclare("orderCreateDeathExchange", "direct", true, false, null); // // // channel.queueDelete("orderCreateQueue"); // channel.queueDelete("orderCreateDeathQueue"); // // // channel.queueDeclare("orderCreateQueue", true, false, false, headers); // channel.queueDeclare("orderCreateDeathQueue", true, false, false, new HashMap<>()); // // channel.queueUnbind("orderCreateQueue","orderCreateExchange", "order.create"); // channel.queueUnbind("orderCreateDeathQueue", "orderCreateDeathExchange", "order.create"); // // channel.queueBind("orderCreateQueue","orderCreateExchange", "order.create"); // channel.queueBind("orderCreateDeathQueue", "orderCreateDeathExchange", "order.create"); new Thread(new DConsumer(channel,11)).start(); new Thread(new DConsumer(channel,12)).start(); new Thread(new DConsumer(channel,13)).start(); new Thread(new DConsumer(channel,14)).start(); new Thread(new DConsumer(channel,15)).start(); new Thread(new DConsumer(channel,21)).start(); new Thread(new DConsumer(channel,22)).start(); new Thread(new DConsumer(channel,23)).start(); new Thread(new DConsumer(channel,24)).start(); new Thread(new DConsumer(channel,25)).start(); new Thread(new DConsumer(channel,31)).start(); new Thread(new DConsumer(channel,32)).start(); new Thread(new DConsumer(channel,33)).start(); new Thread(new DConsumer(channel,34)).start(); new Thread(new DConsumer(channel,35)).start(); new Thread(new DConsumer(channel,41)).start(); new Thread(new DConsumer(channel,42)).start(); new Thread(new DConsumer(channel,43)).start(); new Thread(new DConsumer(channel,44)).start(); new Thread(new DConsumer(channel,45)).start(); // for (int i = 0; i < 10000000; i++) { // BasicProperties props = new BasicProperties().builder().headers(headers).build(); // channel.basicPublish("orderCreateExchange", "order.create", props, (i+"死刑队列10s").getBytes()); // try { // Thread.sleep(1); // } catch (InterruptedException e) { // e.printStackTrace(); // } // } // ConnectionFactory ff=new CachingConnectionFactory(cf); // amqpTemplate=new RabbitTemplate(ff); // amqpTemplate.sen } } class DConsumer implements Runnable { private Channel channel; int id; public DConsumer(Channel channel,Integer id) { super(); this.channel = channel; this.id=id; } @Override public void run() { CancelCallback cancelCallback = new CancelCallback() { @Override public void handle(String consumerTag) throws IOException { System.out.println(consumerTag + " "); } }; DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { System.out.println(id+" "+consumerTag + " " + new String(message.getBody())+message.getProperties().getMessageId()); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; while (true) { try { channel.basicConsume("orderCreateDeathQueue", deliverCallback, cancelCallback); } catch (IOException e) { e.printStackTrace(); } } } }
相关推荐
该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...
Linux 安装 RabbitMQ 应用 / RabbitMQ 延时队列
在RabbitMQ中,延时队列是一个非常实用的功能,尤其在需要处理定时任务或者延迟操作的场景下。例如,订单超时确认、定时发送邮件或短信等。RabbitMQ本身并不直接支持延时队列,但可以通过安装插件来实现这一功能。`...
在本项目中,我们将探讨如何实现RabbitMQ的延时队列以及在四种不同的交换机模式下的队列配置。 首先,让我们理解RabbitMQ的延时队列。在实际业务场景中,有时我们需要在特定时间后才处理某些消息,例如订单超时取消...
支付状态一致性-RabbitMQ死信队列
本教程将详细介绍如何使用SpringBoot集成RabbitMQ来实现一个延时队列,并探讨消息发送与消费确认机制以及消费者端的策略模式应用。 首先,SpringBoot是Java开发者广泛使用的快速开发框架,它简化了Spring的配置和...
1. **配置RabbitMQ延迟队列**:安装RabbitMQ的x-delayed-message插件,并创建一个具有延迟特性的队列。 2. **编写订单服务**:当订单创建时,将订单信息封装成消息并发送到延迟队列,同时返回订单创建成功的响应给...
ASP.NET 编程知识 - RabbitMQ 死信队列和延时队列实践 以下是ASP.NET编程知识中关于RabbitMQ死信队列和延时队列的知识点总结: 一、死信队列 死信队列是一个特殊的队列,用于存储不能被消费的消息。这些消息可能...
RabbitMQ 延迟队列及消息延迟推送实现详解 RabbitMQ 延迟队列及消息延迟推送实现详解是指在 RabbitMQ 中实现消息延迟推送的功能,即在指定的时间后推送消息到目标队列中。这种机制在实际应用中非常有价值,如在淘宝...
因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列。功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live...
RabbitMQ实现延时队列的基本原理: RabbitMQ结合消息的TTL和死信路由特性,将过期消息转发到死信队列,由消费者监听死信队列并消费。 Redis实现延时队列的原理: 使用ZSet存储消息,key为消息ID,value为延迟时间加...
RabbitMQ延迟队列是一种特殊类型的队列,它允许消息在特定时间后才被消费者处理,而不是立即处理。这种功能在某些业务场景下非常有用,例如订单超时处理、定时发送邮件或短信通知等。在RabbitMQ中,实现延迟队列通常...
springboot整合RabbitMQ实现延时队列的两种方式 教程及源码。参考博客:https://blog.csdn.net/qq_29914837/article/details/94070677
在这个名为"rabbitmq_delayed.zip"的压缩包中,我们看到的是一个使用SpringBoot框架实现的RabbitMQ延时队列功能的示例代码。下面将详细解释RabbitMQ延时队列的工作原理以及如何在SpringBoot应用中进行集成。 延时...
5. **安装延时队列插件**:RabbitMQ的延时队列功能并非默认内置,而是通过一个名为`rabbitmq_delayed_message_exchange`的插件提供。使用以下命令启用该插件: ``` rabbitmq-plugins enable rabbitmq_delayed_...
4. **创建交换机和队列**:在RabbitMQ中,交换机决定消息路由到哪个队列。你需要在Spring配置中定义这些实体。例如,创建一个Direct类型的交换机和一个队列: ```java @Configuration public class RabbitConfig ...
rabbitmq延时队列支持插件
5. RabbitMQ 延时队列 RabbitMQ 是一个开源的消息队列框架,提供了实现延迟队列的功能。RabbitMQ 可以用来执行延迟任务,例如可以使用延迟队列来实现订单超时取消等功能。 6. Redis 延时队列 Redis 是一个开源的...