`

rabbitmq延时队列

 
阅读更多
package com.anyec.mq.springRabbit;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

import org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;

public class DeathProducer {
	@Autowired
	static RabbitTemplate amqpTemplate;

	public static void main(String[] args) throws IOException, TimeoutException {
		RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean();

		factory.setHost("localhost");
		factory.setPort(5672);
		factory.setUsername("javamq");
		factory.setPassword("javamq");
		factory.setRequestedChannelMax(10000);
		factory.setVirtualHost("/rabbit");
//		com.rabbitmq.client.ConnectionFactory fo = factory.getObject();
		com.rabbitmq.client.ConnectionFactory cf = factory.getRabbitConnectionFactory();
		Channel channel = cf.newConnection().createChannel();
		Map<String, Object> headers = new HashMap<String, Object>();
		headers.put("x-dead-letter-exchange", "orderCreateDeathExchange");
		headers.put("x-message-ttl", 10000);
		
		
//		channel.exchangeDelete("orderCreateExchange");
//		channel.exchangeDelete("orderCreateDeathExchange");
//		
//		channel.exchangeDeclare("orderCreateExchange", "direct", true, false, null);
//		channel.exchangeDeclare("orderCreateDeathExchange", "direct", true, false, null);
//		
//		
//		channel.queueDelete("orderCreateQueue");
//		channel.queueDelete("orderCreateDeathQueue");
//		
//		
//		channel.queueDeclare("orderCreateQueue", true, false, false, headers);
//		channel.queueDeclare("orderCreateDeathQueue", true, false, false, new HashMap<>());
//		
//		channel.queueUnbind("orderCreateQueue","orderCreateExchange",  "order.create");
//		channel.queueUnbind("orderCreateDeathQueue", "orderCreateDeathExchange",  "order.create");
//		
//		channel.queueBind("orderCreateQueue","orderCreateExchange",  "order.create");
//		channel.queueBind("orderCreateDeathQueue", "orderCreateDeathExchange", "order.create");

		
		new Thread(new DConsumer(channel,11)).start();
		new Thread(new DConsumer(channel,12)).start();
		new Thread(new DConsumer(channel,13)).start();
		new Thread(new DConsumer(channel,14)).start();
		new Thread(new DConsumer(channel,15)).start();
		new Thread(new DConsumer(channel,21)).start();
		new Thread(new DConsumer(channel,22)).start();
		new Thread(new DConsumer(channel,23)).start();
		new Thread(new DConsumer(channel,24)).start();
		new Thread(new DConsumer(channel,25)).start();
		
		new Thread(new DConsumer(channel,31)).start();
		new Thread(new DConsumer(channel,32)).start();
		new Thread(new DConsumer(channel,33)).start();
		new Thread(new DConsumer(channel,34)).start();
		new Thread(new DConsumer(channel,35)).start();
		new Thread(new DConsumer(channel,41)).start();
		new Thread(new DConsumer(channel,42)).start();
		new Thread(new DConsumer(channel,43)).start();
		new Thread(new DConsumer(channel,44)).start();
		new Thread(new DConsumer(channel,45)).start();
//		for (int i = 0; i < 10000000; i++) {
//			BasicProperties props = new BasicProperties().builder().headers(headers).build();
//			channel.basicPublish("orderCreateExchange", "order.create", props, (i+"死刑队列10s").getBytes());
//			try {
//				Thread.sleep(1);
//			} catch (InterruptedException e) {
//				e.printStackTrace();
//			}
//		}
//		ConnectionFactory ff=new CachingConnectionFactory(cf);
//		amqpTemplate=new RabbitTemplate(ff);
//		amqpTemplate.sen
	}
}

class DConsumer implements Runnable {

	private Channel channel;
	int id;

	public DConsumer(Channel channel,Integer id) {
		super();
		this.channel = channel;
		this.id=id;
	
	}

	@Override
	public void run() {
		CancelCallback cancelCallback = new CancelCallback() {

			@Override
			public void handle(String consumerTag) throws IOException {
				System.out.println(consumerTag + " ");

			}
		};
		DeliverCallback deliverCallback = new DeliverCallback() {

			@Override
			public void handle(String consumerTag, Delivery message) throws IOException {
				System.out.println(id+" "+consumerTag + " " + new String(message.getBody())+message.getProperties().getMessageId());
				channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
				
			}
		};
		while (true) {

			try {
				channel.basicConsume("orderCreateDeathQueue", deliverCallback, cancelCallback);
			} catch (IOException e) {
				e.printStackTrace();
			}

		}
	}

}

 

分享到:
评论

相关推荐

    SpringBoot集成RabbitMQ延时队列,自定义延时时间Demo

    该示例通过 rabbitmq_delayed_message_exchange 插件实现自定义延时时间的延时队列。 示例是纯净的,只引入了需要的架包 启动示例时,请确保MQ已经安装了延时插件(附件里带有插件及安装说明)以及示例的MQ相关的配置...

    Linux 安装 RabbitMQ 应用 / RabbitMQ 延时队列

    Linux 安装 RabbitMQ 应用 / RabbitMQ 延时队列

    RabbitMQ+Erlang+RabbitMq延时队列插件

    在RabbitMQ中,延时队列是一个非常实用的功能,尤其在需要处理定时任务或者延迟操作的场景下。例如,订单超时确认、定时发送邮件或短信等。RabbitMQ本身并不直接支持延时队列,但可以通过安装插件来实现这一功能。`...

    rabbitmq延时队列和四种交换机模式下队列的简单实现

    在本项目中,我们将探讨如何实现RabbitMQ的延时队列以及在四种不同的交换机模式下的队列配置。 首先,让我们理解RabbitMQ的延时队列。在实际业务场景中,有时我们需要在特定时间后才处理某些消息,例如订单超时取消...

    支付状态一致性-RabbitMQ死信队列

    支付状态一致性-RabbitMQ死信队列

    springboot+rabbitmq实现延时队列

    本教程将详细介绍如何使用SpringBoot集成RabbitMQ来实现一个延时队列,并探讨消息发送与消费确认机制以及消费者端的策略模式应用。 首先,SpringBoot是Java开发者广泛使用的快速开发框架,它简化了Spring的配置和...

    使用RabbitMQ+延迟队列实现分布式事务的最终一致性方案

    1. **配置RabbitMQ延迟队列**:安装RabbitMQ的x-delayed-message插件,并创建一个具有延迟特性的队列。 2. **编写订单服务**:当订单创建时,将订单信息封装成消息并发送到延迟队列,同时返回订单创建成功的响应给...

    【ASP.NET编程知识】运用.NetCore实例讲解RabbitMQ死信队列,延时队列.docx

    ASP.NET 编程知识 - RabbitMQ 死信队列和延时队列实践 以下是ASP.NET编程知识中关于RabbitMQ死信队列和延时队列的知识点总结: 一、死信队列 死信队列是一个特殊的队列,用于存储不能被消费的消息。这些消息可能...

    RabbitMQ延迟队列及消息延迟推送实现详解

    RabbitMQ 延迟队列及消息延迟推送实现详解 RabbitMQ 延迟队列及消息延迟推送实现详解是指在 RabbitMQ 中实现消息延迟推送的功能,即在指定的时间后推送消息到目标队列中。这种机制在实际应用中非常有价值,如在淘宝...

    如何通过Python实现RabbitMQ延迟队列

    因为系统本身一直在用RabbitMQ做异步处理任务的中间件,所以想到是否可以利用RabbitMQ实现延迟队列。功夫不负有心人,RabbitMQ虽然没有现成可用的延迟队列,但是可以利用其两个重要特性来实现之:1、Time To Live...

    高效延时队列的设计与实现

    RabbitMQ实现延时队列的基本原理: RabbitMQ结合消息的TTL和死信路由特性,将过期消息转发到死信队列,由消费者监听死信队列并消费。 Redis实现延时队列的原理: 使用ZSet存储消息,key为消息ID,value为延迟时间加...

    rabbitMQ延迟队列

    RabbitMQ延迟队列是一种特殊类型的队列,它允许消息在特定时间后才被消费者处理,而不是立即处理。这种功能在某些业务场景下非常有用,例如订单超时处理、定时发送邮件或短信通知等。在RabbitMQ中,实现延迟队列通常...

    springboot整合RabbitMQ实现延时队列的两种方式 教程及源码

    springboot整合RabbitMQ实现延时队列的两种方式 教程及源码。参考博客:https://blog.csdn.net/qq_29914837/article/details/94070677

    rabbitmq_delayed.zip

    在这个名为"rabbitmq_delayed.zip"的压缩包中,我们看到的是一个使用SpringBoot框架实现的RabbitMQ延时队列功能的示例代码。下面将详细解释RabbitMQ延时队列的工作原理以及如何在SpringBoot应用中进行集成。 延时...

    rabbitMq+erlang+延时队列插件完整安装包.7z

    5. **安装延时队列插件**:RabbitMQ的延时队列功能并非默认内置,而是通过一个名为`rabbitmq_delayed_message_exchange`的插件提供。使用以下命令启用该插件: ``` rabbitmq-plugins enable rabbitmq_delayed_...

    spring集成rabbitmq 通俗易懂的demo

    4. **创建交换机和队列**:在RabbitMQ中,交换机决定消息路由到哪个队列。你需要在Spring配置中定义这些实体。例如,创建一个Direct类型的交换机和一个队列: ```java @Configuration public class RabbitConfig ...

    rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez

    rabbitmq延时队列支持插件

    一口气说出Java 6种延时队列的实现方法(面试官也得服)

    5. RabbitMQ 延时队列 RabbitMQ 是一个开源的消息队列框架,提供了实现延迟队列的功能。RabbitMQ 可以用来执行延迟任务,例如可以使用延迟队列来实现订单超时取消等功能。 6. Redis 延时队列 Redis 是一个开源的...

Global site tag (gtag.js) - Google Analytics