参考:http://www.aichengxu.com/view/37900
如果是一个队列只希望一个消费者进行处理,那么定义队列的时候可以指定时独占模式:exclusive
如果是一个队列由多个消费者,但是只希望消息由其中的一个消费者优先进行处理,当这个消费者挂掉的时候,再由其他消费者进行处理的话,可以给消费者设置不同的优先级
从RabbitMQ的3.2版本开始,这个消息代理支持消费者优先级。这个可以通过设置消费者的x-priority进行配置。
spring配置:为了方便,命名空间在listener元素上提供了priority属性:
<rabbit:listener-containerconnection-factory="rabbitConnectionFactory">
<rabbit:listenerqueues="some.queue"ref="somePojo"method="handle"priority="10"/>
</rabbit:listener-container>
测试代码:
public void testConsumerPriorities() throws Exception {
String queue = channel.queueDeclare().getQueue();
String queue = channel.queueDeclare().getQueue();
//声明三个不同级别的消费者
QueueingConsumer highConsumer = new QueueingConsumer(channel);
QueueingConsumer medConsumer = new QueueingConsumer(channel);
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
String high = channel.basicConsume(queue, true, args(1), highConsumer); // 优先级为1 最高
String med = channel.basicConsume(queue, true, medConsumer); //优先级为0 第二
channel.basicConsume(queue, true, args(-1), lowConsumer);// 优先级为-1 最低
QueueingConsumer highConsumer = new QueueingConsumer(channel);
QueueingConsumer medConsumer = new QueueingConsumer(channel);
QueueingConsumer lowConsumer = new QueueingConsumer(channel);
String high = channel.basicConsume(queue, true, args(1), highConsumer); // 优先级为1 最高
String med = channel.basicConsume(queue, true, medConsumer); //优先级为0 第二
channel.basicConsume(queue, true, args(-1), lowConsumer);// 优先级为-1 最低
publish(queue, COUNT, "high");//COUNT=10 发送10条消息
publish(queue, COUNT, "med");
publish(queue, COUNT, "low");
publish(queue, COUNT, "med");
publish(queue, COUNT, "low");
assertContents(highConsumer, COUNT, "high");//消费消息
assertContents(medConsumer, COUNT, "med");
assertContents(lowConsumer, COUNT, "low");
}
assertContents(medConsumer, COUNT, "med");
assertContents(lowConsumer, COUNT, "low");
}
private Map<String, Object> args(Object o) {
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-priority", o);
return map;
}
Map<String, Object> map = new HashMap<String, Object>();
map.put("x-priority", o);
return map;
}
private void assertContents(QueueingConsumer qc, int count, String msg) throws InterruptedException {
for (int i = 0; i < count; i++) {
QueueingConsumer.Delivery d = qc.nextDelivery();
assertEquals(msg, new String(d.getBody()));
}
assertEquals(null, qc.nextDelivery(0));
}
for (int i = 0; i < count; i++) {
QueueingConsumer.Delivery d = qc.nextDelivery();
assertEquals(msg, new String(d.getBody()));
}
assertEquals(null, qc.nextDelivery(0));
}
解析:
1、这里一共声明了三个优先级不一样的消费者:高、中、低
2、发送消息时,发送的顺序是高、中、低。那么就意味着broker发送消息时,也是高、中、低的顺序。
4、这里还发现一点:就是首先发送的消息为A,然后首先接收消息的是也是A,但是A的优先级不是最高的,那么程序就会一直阻塞。
原因:这里如果给consumer设置了优先级,这里假设两个consumer的优先级是一样的。那么在初始环境下,broker会轮流发送消息。这个意思就是假设broker里面有4个消息,那么broker会把1和3的消息先发给先接受的consumer。比如这里C1先接收消息,那么C1的队列里面就有2个消息了。但是当我们第3次接收消息时,就会阻塞。因为consumer接收消息队列的是LinkedBlockingQueue。
相关推荐
- **Priority Queues**: 支持为消息设置优先级,高优先级的消息会被优先处理。 - **Cluster**: 多台机器上的RabbitMQ实例可以组成集群,提供高可用性和水平扩展性。 - **Mirrored Queues**: 队列可以在集群中的多...
13. **RabbitMQ的消息确认机制(publisher confirms和consumer acknowledgments)?** - publisher confirms:生产者确认,确保消息已被RabbitMQ接收。 - consumer acknowledgments:消费者确认,确认消息已被正确...
- **Priority**: 消息可以设置优先级,高优先级的消息优先被消费。 - **Cluster**: 多个RabbitMQ节点组成集群,提供高可用性和负载均衡。 - **Mirrored Queues**: 队列镜像可以在集群中创建队列副本,提高数据安全...
接着,会探讨RabbitMQ的高级特性,如死信队列(Dead Letter Queue)、延迟队列(Delay Queue)和优先级队列(Priority Queue),这些特性可以增强系统的容错能力和灵活性。此外,还会介绍如何实现消息的持久化,确保...
- **Priority**:消息优先级,允许设置消息优先级,便于处理紧急任务。 - **Durable**:持久化,保证消息即使在服务器重启后也不会丢失。 - **Cluster**:集群,多个RabbitMQ节点组成集群,提升可用性和容错性。 - *...
- **Consumer**: 消费者是接收并处理队列中消息的组件。 2. **RabbitMQ与Java的集成** - **JMS (Java Message Service)**: Java开发者通常使用JMS API与RabbitMQ交互,通过`com.rabbitmq.jms.client`库实现。 - ...
- **Consumer Behavior**:多个消费者共享一个队列,通过`channel.basicQos(1)`方法限制每次仅能处理一个消息。 ##### 6.3 Publish/Subscribe Mode (发布/订阅模式) - **Exchange Type**:设置为“fanout”。 - **...
RabbitMQ中的核心概念包括生产者(Producer)、消费者(Consumer)、交换机(Exchange)和队列(Queue)。 - **生产者**:发布消息的应用程序,负责创建并发送消息到RabbitMQ。 - **消费者**:接收消息的应用程序,...
- 生产者通过设置队列属性 `x-max-priority` 来启用优先级队列。 - 消费者通过设置消息的 `priority` 属性来指定消息的优先级。 - **示例代码**: - **生产者**: ```java Map, Object> args = new HashMap(); ...
- **Priority Queues**: 支持消息优先级,高优先级的消息先被消费。 - **Message TTL in Headers**: 消息头中可以包含TTL,实现更灵活的过期策略。 6. **监控与管理** - 使用RabbitMQ管理控制台(web界面)进行...
RabbitMQ提供了一些高级特性,如死信队列(Dead Letter Queue)、延迟队列(Delayed Queue)和优先级队列(Priority Queue),以增强消息处理的灵活性。例如,当消息无法正确路由或消费者无法处理时,这些消息会被放...
消息头中包括 routing-key、priority 等标准消息头以及其它自定义消息头,用于定义 RabbitMQ 对消息行为。消息体是字节流,包含消息内容。 3. Connection: 客户端与 Broker 之间的 TCP 连接。 4. Channel: Channel ...
- 顺序消息和事务消息是解决顺序问题的策略,如RabbitMQ的TTL和Priority特性。 6. **消息队列的选择与比较** - RabbitMQ:基于AMQP协议,广泛应用,支持多种语言。 - ActiveMQ:开源,支持多种协议,但性能相对...
3. 顺序队列:部分消息队列服务提供了顺序队列功能,如RabbitMQ的TTL和Priority,AWS SQS的FIFO队列,它们能保证消息的顺序消费。 4. 一致性哈希:利用一致性哈希算法,将具有相同属性(如用户ID)的消息路由到同一...