`
yimeng528
  • 浏览: 189239 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq学习2:Work Queues

阅读更多

在前面的已经提到了一对一的情况;现在一个生产者与多个消费者的情况(Work Queues)。
Work Queues的示意图如下:

 

对于上图的模型中对于c端的worker来说。RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。这样就会导致一个worker很忙,而另一个却很空闲。这种情况可能都不想出现。如何解决这个问题呢。当然最理想的情况是均匀分配消息给每个worker。我们可能通过channel . basicQos(1)方法(prefetchCount = 1 )来设置同一时间每次发给一个消息给一个worker。示意图如下:

 

P端的程序如下:

Java代码  收藏代码
  1. package com.abin.rabbitmq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.MessageProperties;  
  7.   
  8. public class NewTask {  
  9.     private static final String TASK_QUEUE_NAME = "task_queue";  
  10.   
  11.     public static void main(String[] argv) throws Exception {  
  12.   
  13.         ConnectionFactory factory = new ConnectionFactory();  
  14.         factory.setHost("localhost");  
  15.         Connection connection = factory.newConnection();  
  16.         Channel channel = connection.createChannel();  
  17.         //声明此队列并且持久化  
  18.         channel.queueDeclare(TASK_QUEUE_NAME, truefalsefalsenull);  
  19.   
  20.         String message = getMessage(argv);  
  21.   
  22.         channel.basicPublish("", TASK_QUEUE_NAME,  
  23.                 MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//持久化消息  
  24.         System.out.println(" [x] Sent '" + message + "'");  
  25.   
  26.         channel.close();  
  27.         connection.close();  
  28.     }  
  29.   
  30.     private static String getMessage(String[] strings) {  
  31.         if (strings.length < 1)  
  32.             return "Hello World!";  
  33.         return joinStrings(strings, " ");  
  34.     }  
  35.   
  36.     private static String joinStrings(String[] strings, String delimiter) {  
  37.         int length = strings.length;  
  38.         if (length == 0)  
  39.             return "";  
  40.         StringBuilder words = new StringBuilder(strings[0]);  
  41.         for (int i = 1; i < length; i++) {  
  42.             words.append(delimiter).append(strings[i]);  
  43.         }  
  44.         return words.toString();  
  45.     }  
  46. }  

    多次运行此程序并传入的参数分别为“First message ”,“Secondmessage ”,“Third message ”,“Fourth message ”,“Fifth message ”

 

C端的程序如下:

Java代码  收藏代码
  1. package com.abin.rabbitmq;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.QueueingConsumer;  
  7.   
  8. public class Worker {  
  9.     private static final String TASK_QUEUE_NAME = "task_queue";  
  10.     public static void main(String[] argv) throws Exception {  
  11.         ConnectionFactory factory = new ConnectionFactory();  
  12.         factory.setHost("localhost");  
  13.         Connection connection = factory.newConnection();  
  14.         Channel channel = connection.createChannel();  
  15.         //声明此队列并且持久化  
  16.         channel.queueDeclare(TASK_QUEUE_NAME, truefalsefalsenull);  
  17.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  18.   
  19.         channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者  
  20.         /* We're about to tell the server to deliver us the messages from the queue.  
  21.          * Since it will push us messages asynchronously,  
  22.          * we provide a callback in the form of an object that will buffer the messages  
  23.          * until we're ready to use them. That is what QueueingConsumer does.*/  
  24.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  25.         /* 
  26.           把名字为TASK_QUEUE_NAME的Channel的值回调给QueueingConsumer,即使一个worker在处理消息的过程中停止了,这个消息也不会失效 
  27.         */  
  28.         channel.basicConsume(TASK_QUEUE_NAME, false, consumer);  
  29.   
  30.         while (true) {  
  31.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();//得到消息传输信息  
  32.             String message = new String(delivery.getBody());  
  33.   
  34.             System.out.println(" [x] Received '" + message + "'");  
  35.             doWork(message);  
  36.             System.out.println(" [x] Done");  
  37.   
  38.             channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//下一个消息  
  39.         }  
  40.     }  
  41.   
  42.     private static void doWork(String task) throws InterruptedException {  
  43.         for (char ch : task.toCharArray()) {  
  44.             if (ch == '.')  
  45.                 Thread.sleep(1000);//这里是假装我们很忙  
  46.         }  
  47.     }  
  48. }  

  开启两个worker分别运行。运行结果如:

c1的结果:

Java代码  收藏代码
  1. [*] Waiting for messages. To exit press CTRL+C  
  2.  [x] Received 'First message'  
  3.  [x] Received 'Third message'  
  4.  [x] Received 'Fifth message'  

 c2的结果

Java代码  收藏代码
  1. [*] Waiting for messages. To exit press CTRL+C  
  2.  [x] Received 'Second message'  
  3.  [x] Received 'Fourth message'  

 

 

分享到:
评论
4 楼 ZZX19880809 2014-11-01  
根本就没有. 应该输入first message.
3 楼 jiaofuyou 2014-03-19  
独孤日日也 写道
我亲自试验了并没有实现啊,第一个worker输出:
  • Waiting for messages. To exit press CTRL+C
  • [x] Received '1 message'
    [x] Done
    [x] Received '2 message'
    [x] Done
    [x] Received '3 message'
    [x] Done
    第二个worker输出:
  • Waiting for messages. To exit press CTRL+C

  • 这是为什么呢




    我是用PHP测试的,是可以的
    2 楼 独孤日日也 2014-03-18  
    我亲自试验了并没有实现啊,第一个worker输出:
  • Waiting for messages. To exit press CTRL+C
  • [x] Received '1 message'
    [x] Done
    [x] Received '2 message'
    [x] Done
    [x] Received '3 message'
    [x] Done
    第二个worker输出:
  • Waiting for messages. To exit press CTRL+C

  • 这是为什么呢
    1 楼 jiaofuyou 2014-03-13  
    想问个问题,象这种任务分发的工作队列,你举的例子是一个队列被多个消费者处理,那有没有办法,多个队列的消息由一个消费者来处理呢?

    相关推荐

      RabbitMQ练习(Work Queues)

      RabbitMQ练习(Work Queues)

      rabbitMQ简单应用pattern1 Work queues 工作队列模式

      在本文中,我们将深入探讨RabbitMQ中的"Work Queues"(工作队列模式),这是一种优化任务处理效率的重要策略。 工作队列模式,也称为负荷分发或批处理队列,其核心思想是通过将大量任务分解为小型可处理单元,然后...

      RabbitMQ Server3.13.0

      1. **Work Queues**: 用于负载均衡,多个消费者可以从同一个队列中获取任务进行处理,防止同一任务被重复处理。 2. **Publish/Subscribe**: 通过主题交换机实现广播模式,所有订阅特定主题的消费者都会收到消息。 ...

      rabbitmq安装包

      1. **工作队列(Work Queues)**: 用于分配任务到多个工作者,以提高处理效率。 2. **发布/订阅(Publish/Subscribe)**: 让多个消费者可以同时接收消息。 3. **路由(Routing)**: 基于路由键将消息发送到指定...

      rabbitMQ开发教程-中文翻译

      在实际应用中,RabbitMQ能够支持多种消息模式和交换类型,包括工作队列(Work queues)、发布/订阅(Publish/Subscribe)、路由(Routing)、Topics类型交换机和远程过程调用(RPC)等,这些模式在RabbitMQ中通过...

      rabbitMQ:RabbitMQ实践

      兔子MQ RabbitMQ实践包括6种做法,例如01:Hello World 02:Work Queues 03:Publish Subscribe 04:Routing 05:Topics 06:RPC

      rabbitmq 安装包

      1. **工作队列(Work Queues)**:用于分配大量后台任务,多个消费者可以同时处理来自同一队列的任务,提高效率。 2. **发布/订阅(Publish/Subscribe)**:消息广播模式,所有订阅特定主题的消费者都会收到消息。 ...

      rabbitmq 学习资料

      - **Work Queues**(也称为Load Balancing Queues):多个消费者从同一个队列中获取任务,实现任务的并发处理,常用于处理大量CPU密集型或IO密集型的任务。 - **Publish/Subscribe**:发布者将消息发送到主题交换机...

      rabbitMQ安装包.zip

      7. **工作模式(Work Queues)**:一种常见的使用模式,其中多个消费者并行处理来自单一队列的消息,用于负载均衡和任务处理。 为了管理和监控RabbitMQ,我们可以使用RabbitMQ的Web管理界面。默认情况下,该界面在...

      RabbitMQ培训PPT

      1. **Work Queues**: 并发处理任务,多个消费者从同一队列中获取任务,提高效率。 2. **Publish/Subscribe模式**: 使用Fanout交换器实现广播消息,所有订阅者都能收到消息。 3. **死信队列**: 错误处理,消息无法...

      rabbitMQ 源代码实例

      RabbitMQ,作为一款开源的消息队列系统,...总的来说,这个源代码实例为学习和掌握RabbitMQ提供了一个很好的起点,不仅能够帮助你理解RabbitMQ的核心概念,还能让你在实践中运用这些知识,提高你的分布式系统设计能力。

      RabbitMQ实战 高效部署分布式消息队列

      在IT行业中,消息队列(Message Queue)是分布式系统中重要的中间件,它扮演着解耦、异步处理以及负载均衡的角色。...在实践中,不断学习和探索RabbitMQ的更多功能,将有助于提升整个系统的效率和可靠性。

      rabbitMQ.zip

      2. **工作模式(Work Queues)**:也称为FIFO(先进先出)模式,用于分发任务到多个工作者,确保任务被逐一处理,不会同时处理相同任务。 3. **消息确认模式(Message Acknowledgement)**:在RabbitMQ中,消息的...

      RabbitMQ实战

      RabbitMQ 支持多种消息发布模式,包括简单模式(Simple)、工作队列模式(Work Queues)、发布/订阅模式(Publish/Subscribe)、路由模式(Routing)、主题模式(Topics)等。 #### 二、安装RabbitMQ ##### 2.1 ...

      rabbitmq C# 调用测试

      除了基础的发布/消费模型,RabbitMQ还支持许多高级特性,如工作队列(Work Queues)、发布/确认(Publish/Confirm)、死信队列(Dead Letter Queues)和延迟队列(Delayed Queues)。这些特性可以帮助构建更健壮和...

      rabbitmq简单demo

      在本"rabbitmq简单demo"中,我们将探讨RabbitMQ的三种主要工作模式:Work Queues、Publish/Subscribe和Routing模式。 ### 1. Work Queues(工作队列) 工作队列模式是最基本的RabbitMQ用法,常用于负载均衡和异步...

      RabbitMQ.docx

      2. **Work Queues(工作队列)**:也称为公平分发,用于多消费者场景,确保每个消费者平均分配任务,避免某一个消费者负载过高。 3. **Publish/Subscribe(发布/订阅)**:生产者发布消息到主题(Topic),消费者...

      Laravel开发-rabbitmq-laravel

      安装完成后,确保 RabbitMQ 服务已经启动,并创建你需要的交换机(exchanges)和队列(queues)。 **二、安装 rabbitmq-laravel** 在 Laravel 项目中,你可以使用 Composer 来安装 `rabbitmq-laravel` 包: ```...

      rabbitmqdemo

      除了基本的发送和接收消息之外,RabbitMQ 还支持许多高级特性,如工作队列(Work Queues)、发布/订阅模式(Publish/Subscribe)、主题路由(Topic Routing)等。在 "rabbitmqdemo" 中,你可能能够发现这些模式的...

      rabbitmq面试题.pdf

      - 缺点:学习曲线较陡峭。 **19. RabbitMQ适合哪些场景?** - **异步处理:** 任务排队、日志处理等。 - **微服务间的通信:** 提供服务间的消息传递机制。 - **消息缓存:** 减轻数据库的压力。 **20. 如何处理...

    Global site tag (gtag.js) - Google Analytics