`

rabbitmq-consumer-x-priority

 
阅读更多

 参考:http://www.aichengxu.com/view/37900

 

如果是一个队列只希望一个消费者进行处理,那么定义队列的时候可以指定时独占模式:exclusive

如果是一个队列由多个消费者,但是只希望消息由其中的一个消费者优先进行处理,当这个消费者挂掉的时候,再由其他消费者进行处理的话,可以给消费者设置不同的优先级

从RabbitMQ的3.2版本开始,这个消息代理支持消费者优先级。这个可以通过设置消费者的x-priority进行配置。

spring配置:为了方便,命名空间在listener元素上提供了priority属性:

  1. <rabbit:listener-containerconnection-factory="rabbitConnectionFactory">
  2. <rabbit:listenerqueues="some.queue"ref="somePojo"method="handle"priority="10"/>
  3. </rabbit:listener-container>
测试代码:
 public void testConsumerPriorities() throws Exception {
        String queue = channel.queueDeclare().getQueue();
        //声明三个不同级别的消费者
        QueueingConsumer highConsumer = new QueueingConsumer(channel);
        QueueingConsumer medConsumer = new QueueingConsumer(channel);
        QueueingConsumer lowConsumer = new QueueingConsumer(channel);
        String high = channel.basicConsume(queue, true, args(1), highConsumer); // 优先级为1  最高
        String med = channel.basicConsume(queue, true, medConsumer); //优先级为0  第二
        channel.basicConsume(queue, true, args(-1), lowConsumer);// 优先级为-1 最低
        publish(queue, COUNT, "high");//COUNT=10 发送10条消息
        publish(queue, COUNT, "med");
        publish(queue, COUNT, "low");
        assertContents(highConsumer, COUNT, "high");//消费消息
        assertContents(medConsumer, COUNT, "med");
        assertContents(lowConsumer, COUNT, "low");
    }
private Map<String, Object> args(Object o) {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-priority", o);
        return map;
    }
private void assertContents(QueueingConsumer qc, int count, String msg) throws InterruptedException {
        for (int i = 0; i < count; i++) {
            QueueingConsumer.Delivery d = qc.nextDelivery();
            assertEquals(msg, new String(d.getBody()));
        }
        assertEquals(null, qc.nextDelivery(0));
    }
     解析:
     1、这里一共声明了三个优先级不一样的消费者:高、中、低
     2、发送消息时,发送的顺序是高、中、低。那么就意味着broker发送消息时,也是高、中、低的顺序。
     4、这里还发现一点:就是首先发送的消息为A,然后首先接收消息的是也是A,但是A的优先级不是最高的,那么程序就会一直阻塞。
           原因:这里如果给consumer设置了优先级,这里假设两个consumer的优先级是一样的。那么在初始环境下,broker会轮流发送消息。这个意思就是假设broker里面有4个消息,那么broker会把1和3的消息先发给先接受的consumer。比如这里C1先接收消息,那么C1的队列里面就有2个消息了。但是当我们第3次接收消息时,就会阻塞。因为consumer接收消息队列的是LinkedBlockingQueue。
 
 
 
 
分享到:
评论

相关推荐

    RabbitMQ安装包

    - **Priority Queues**: 支持为消息设置优先级,高优先级的消息会被优先处理。 - **Cluster**: 多台机器上的RabbitMQ实例可以组成集群,提供高可用性和水平扩展性。 - **Mirrored Queues**: 队列可以在集群中的多...

    24道消息队列RabbitMQ面试题!.zip

    13. **RabbitMQ的消息确认机制(publisher confirms和consumer acknowledgments)?** - publisher confirms:生产者确认,确保消息已被RabbitMQ接收。 - consumer acknowledgments:消费者确认,确认消息已被正确...

    rabbitmq.zip

    - **Priority**: 消息可以设置优先级,高优先级的消息优先被消费。 - **Cluster**: 多个RabbitMQ节点组成集群,提供高可用性和负载均衡。 - **Mirrored Queues**: 队列镜像可以在集群中创建队列副本,提高数据安全...

    RabbitMQ实战 高效部署分布式消息队列 带目录 高清版 PDF

    接着,会探讨RabbitMQ的高级特性,如死信队列(Dead Letter Queue)、延迟队列(Delay Queue)和优先级队列(Priority Queue),这些特性可以增强系统的容错能力和灵活性。此外,还会介绍如何实现消息的持久化,确保...

    RabbitMQ.zip

    - **Priority**:消息优先级,允许设置消息优先级,便于处理紧急任务。 - **Durable**:持久化,保证消息即使在服务器重启后也不会丢失。 - **Cluster**:集群,多个RabbitMQ节点组成集群,提升可用性和容错性。 - *...

    rabbitMQ学习心得

    - **Consumer**: 消费者是接收并处理队列中消息的组件。 2. **RabbitMQ与Java的集成** - **JMS (Java Message Service)**: Java开发者通常使用JMS API与RabbitMQ交互,通过`com.rabbitmq.jms.client`库实现。 - ...

    RabbitMQ

    - **Consumer Behavior**:多个消费者共享一个队列,通过`channel.basicQos(1)`方法限制每次仅能处理一个消息。 ##### 6.3 Publish/Subscribe Mode (发布/订阅模式) - **Exchange Type**:设置为“fanout”。 - **...

    RabbitMQ详解(超详细整理,值得珍藏)

    RabbitMQ中的核心概念包括生产者(Producer)、消费者(Consumer)、交换机(Exchange)和队列(Queue)。 - **生产者**:发布消息的应用程序,负责创建并发送消息到RabbitMQ。 - **消费者**:接收消息的应用程序,...

    RabbitMQ最新2021年面试题,高级面试题及附答案解析.md

    - 生产者通过设置队列属性 `x-max-priority` 来启用优先级队列。 - 消费者通过设置消息的 `priority` 属性来指定消息的优先级。 - **示例代码**: - **生产者**: ```java Map, Object&gt; args = new HashMap(); ...

    RabbitMQ API中文文档(英文HTML版)

    - **Priority Queues**: 支持消息优先级,高优先级的消息先被消费。 - **Message TTL in Headers**: 消息头中可以包含TTL,实现更灵活的过期策略。 6. **监控与管理** - 使用RabbitMQ管理控制台(web界面)进行...

    rabbitmq 消息队列

    RabbitMQ提供了一些高级特性,如死信队列(Dead Letter Queue)、延迟队列(Delayed Queue)和优先级队列(Priority Queue),以增强消息处理的灵活性。例如,当消息无法正确路由或消费者无法处理时,这些消息会被放...

    RabbitMQ消息中间件示例详解

    消息头中包括 routing-key、priority 等标准消息头以及其它自定义消息头,用于定义 RabbitMQ 对消息行为。消息体是字节流,包含消息内容。 3. Connection: 客户端与 Broker 之间的 TCP 连接。 4. Channel: Channel ...

    12_总结一下消息队列相关问题的面试技巧.zip

    - 顺序消息和事务消息是解决顺序问题的策略,如RabbitMQ的TTL和Priority特性。 6. **消息队列的选择与比较** - RabbitMQ:基于AMQP协议,广泛应用,支持多种语言。 - ActiveMQ:开源,支持多种协议,但性能相对...

    09_我该怎么保证从消息队列里拿到的数据按顺序执行?.zip

    3. 顺序队列:部分消息队列服务提供了顺序队列功能,如RabbitMQ的TTL和Priority,AWS SQS的FIFO队列,它们能保证消息的顺序消费。 4. 一致性哈希:利用一致性哈希算法,将具有相同属性(如用户ID)的消息路由到同一...

Global site tag (gtag.js) - Google Analytics