只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效
RabbitMQ3.5以后已经集成了rabbitmq_priority_queue
引用
验证方式:
触发为及时消费场景,常用场景与Qos结合使用
1、可先发送消息,再进行消费
2、开启手动应答、设置Qos。若为1,在一个消费者存在的情况下,除第一个消息外均按优先级进行消费(第一个消息被及时消费掉了)
3、可在方式二的基础上不断增加消费者,也符合优先调用规则
为消息设置优先级别:
//随机设置消息优先级
Builder properties=new BasicProperties.Builder();
int priority=new Random().nextInt(10);
properties.priority(priority);//建议0~255,超过貌似也没问题
channel.basicPublish(exchange_name, "", properties.build(), SerializationUtils.serialize(_mes));
为队列创建优先级别:
//设置队列的优先级,消息的优先级大于队列的优先级,以较小值为准(例如:队列优先级5、消息优先级8,消息实际优先级为5)
Map<String, Object> args=new HashMap<String, Object>();
args.put("x-max-priority", 10);//队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
channel.queueDeclare(queueName, false, false, false, args);
队列、消息上均要设置优先级才可生效,以较小值为准
队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
优先级队列在内存、硬盘、cpu会有成本消耗,不建议创建大量的优先级别(数量、级别种类、大级别,理解混乱,英文理解困难...)
package com.demo.mq.rabbitmq.example10;
import java.io.IOException;
import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.AMQP.BasicProperties.Builder;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 优先队列测试消息发送类
* @author sheungxin
*
*/
public class PrioritySend {
private static String exchange_name="priority_direct";
public static void prioritySend(Serializable mes) throws IOException, TimeoutException{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange_name, BuiltinExchangeType.DIRECT);
//发送10条消息
for(int i=0;i<10;i++){
//随机设置消息优先级
Builder properties=new BasicProperties.Builder();
int priority=new Random().nextInt(10);
properties.priority(priority);//建议0~255,超过貌似也没问题
String _mes=mes.toString()+i;
channel.basicPublish(exchange_name, "", properties.build(), SerializationUtils.serialize(_mes));
System.out.println(priority+" "+_mes);
}
channel.close();
conn.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
prioritySend("priority send:hello world!");
}
}
package com.demo.mq.rabbitmq.example10;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.SerializationUtils;
import com.demo.mq.rabbitmq.MqManager;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
public class PriorityRecv {
private static String exchange_name="priority_direct";
private static String queueName="priority_queue";
/**
* 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效
* 验证方式:
* 1、可先发送消息,再进行消费
* 2、开启手动应答、设置Qos。若为1,在一个消费者存在的情况下,除第一个消息为均按优先级进行消费(第一个消息被及时消费掉了)
* 3、可在方式二的基础上不断增加消费者,也符合优先调用规则
* 注意要点:
* 1、队列、消息上均要设置优先级才可生效,以较小值为准;
* 2、队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
* 3、优先级队列在内存、硬盘、cpu会有成本消耗,不建议创建大量的优先级别(数量、级别种类、大级别,理解混乱,英文理解困难...)
* @throws IOException
* @throws TimeoutException
*/
public static void priorityRecv() throws IOException, TimeoutException{
Connection conn=MqManager.newConnection();
Channel channel=conn.createChannel();
channel.exchangeDeclare(exchange_name, BuiltinExchangeType.DIRECT);
//设置队列的优先级,消息的优先级大于队列的优先级,以较小值为准(例如:队列优先级5、消息优先级8,消息实际优先级为5)
Map<String, Object> args=new HashMap<String, Object>();
args.put("x-max-priority", 10);//队列优先级只能声明一次,不可改变(涉及到硬盘、内存存储方式)
channel.queueDeclare(queueName, false, false, false, args);
channel.queueBind(queueName, exchange_name, "");
channel.basicQos(1);//需要开启手动应答模式,否则无效
channel.basicConsume(queueName, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException{
String mes=SerializationUtils.deserialize(body);
System.out.println(properties.getPriority()+":priority Received :'"+mes+"' done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
public static void main(String[] args) throws IOException, TimeoutException {
priorityRecv();
}
}
分享到:
相关推荐
《RabbitMQ实战:高效部署分布式消息队列》是一本深度解析RabbitMQ技术的书籍,专注于帮助读者理解和掌握如何在实际项目中高效地部署和使用这个强大的消息中间件。RabbitMQ作为开源的消息代理和队列服务器,广泛应用...
接着,会探讨RabbitMQ的高级特性,如死信队列(Dead Letter Queue)、延迟队列(Delay Queue)和优先级队列(Priority Queue),这些特性可以增强系统的容错能力和灵活性。此外,还会介绍如何实现消息的持久化,确保...
RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列 RabbitMQ实战 高效部署分布式消息队列
RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送...
本教程将深入探讨RabbitMQ的使用和部署,帮助读者掌握如何高效地构建分布式消息队列。 **1. 消息队列基础** - **消息模型**:理解RabbitMQ的基本工作原理,包括生产者、消费者、交换器和队列的概念。 - **AMQP协议*...
本篇文章将深入探讨RabbitMQ的核心概念、工作原理以及如何高效地部署和使用分布式消息队列。 1. **RabbitMQ基础** - **定义**:RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)协议的消息中间件,它...
RabbitMQ学习实践二:MQ的安装
RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)协议实现的开源消息队列系统,广泛应用于微服务架构、解耦系统组件以及异步任务处理等场景。 首先,我们需要理解消息队列的基本概念。消息队列是一种在...
《RabbitMQ实战:高效部署分布式消息队列》是一本专为希望深入理解和应用RabbitMQ的IT从业者准备的指南。RabbitMQ是一款开源的消息代理软件,它在分布式系统中承担着消息中间件的角色,负责处理应用程序之间的通信,...
在本文中,我们将深入探讨RabbitMQ的核心概念、功能特性、部署策略以及如何通过实战来高效地部署分布式消息队列。 1. **RabbitMQ核心概念** - **Broker**: RabbitMQ服务器本身就是一个消息broker,它负责接收、...
《RabbitMQ实战:高效部署分布式消息队列》是一本深度探讨RabbitMQ技术的书籍,专注于如何在实际环境中高效地部署和应用分布式消息队列系统。RabbitMQ作为业界广泛使用的开源消息代理,它提供了可靠的消息传递机制,...
**RabbitMQ 消息中间件在抢购场景中的应用** 在现代互联网应用中,抢购活动已经成为一种常见的促销手段,吸引大量用户在同一时间参与。为了处理这种高并发的业务场景,开发人员需要采取有效的方式来分发和处理大量...
《RabbitMQ实战高效部署分布式消息队列》完整pdf书籍, 初学者可以看看
在灾情预警等需要优先处理紧急消息的场景中,消息队列的优先级功能至关重要。 首先,我们需要理解RabbitMQ的消息队列工作原理。消息由生产者生成并发送到交换机,交换机再将消息分发到已绑定的队列中。消费者监听...
RabbitMQ 的使用场景包括: 1. 服务间异步通信 2. 顺序消费 3. 定时任务 4. 请求削峰 RabbitMQ 的工作机制: 1. 生产者将消息发布到交换器上 2. 交换器根据路由键将消息路由到特定的队列 3. 消息到达队列,消费者...