在前面的已经提到了一对一的情况;现在一个生产者与多个消费者的情况(Work Queues)。
Work Queues的示意图如下:
对于上图的模型中对于c端的worker来说。RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。这样就会导致一个worker很忙,而另一个却很空闲。这种情况可能都不想出现。如何解决这个问题呢。当然最理想的情况是均匀分配消息给每个worker。我们可能通过channel . basicQos(1)方法(prefetchCount = 1 )来设置同一时间每次发给一个消息给一个worker。示意图如下:
P端的程序如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- public class NewTask {
- private static final String TASK_QUEUE_NAME = "task_queue";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明此队列并且持久化
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- String message = getMessage(argv);
- channel.basicPublish("", TASK_QUEUE_NAME,
- MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//持久化消息
- System.out.println(" [x] Sent '" + message + "'");
- channel.close();
- connection.close();
- }
- private static String getMessage(String[] strings) {
- if (strings.length < 1)
- return "Hello World!";
- return joinStrings(strings, " ");
- }
- private static String joinStrings(String[] strings, String delimiter) {
- int length = strings.length;
- if (length == 0)
- return "";
- StringBuilder words = new StringBuilder(strings[0]);
- for (int i = 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
多次运行此程序并传入的参数分别为“First message ”,“Secondmessage ”,“Third message ”,“Fourth message ”,“Fifth message ”
C端的程序如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Worker {
- private static final String TASK_QUEUE_NAME = "task_queue";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明此队列并且持久化
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者
- /* We're about to tell the server to deliver us the messages from the queue.
- * Since it will push us messages asynchronously,
- * we provide a callback in the form of an object that will buffer the messages
- * until we're ready to use them. That is what QueueingConsumer does.*/
- QueueingConsumer consumer = new QueueingConsumer(channel);
- /*
- 把名字为TASK_QUEUE_NAME的Channel的值回调给QueueingConsumer,即使一个worker在处理消息的过程中停止了,这个消息也不会失效
- */
- channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();//得到消息传输信息
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
- doWork(message);
- System.out.println(" [x] Done");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//下一个消息
- }
- }
- private static void doWork(String task) throws InterruptedException {
- for (char ch : task.toCharArray()) {
- if (ch == '.')
- Thread.sleep(1000);//这里是假装我们很忙
- }
- }
- }
开启两个worker分别运行。运行结果如:
c1的结果:
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'First message'
- [x] Received 'Third message'
- [x] Received 'Fifth message'
c2的结果
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'Second message'
- [x] Received 'Fourth message'
相关推荐
RabbitMQ练习(Work Queues)
在本文中,我们将深入探讨RabbitMQ中的"Work Queues"(工作队列模式),这是一种优化任务处理效率的重要策略。 工作队列模式,也称为负荷分发或批处理队列,其核心思想是通过将大量任务分解为小型可处理单元,然后...
1. **工作队列(Work Queues)**: 用于分配任务到多个工作者,以提高处理效率。 2. **发布/订阅(Publish/Subscribe)**: 让多个消费者可以同时接收消息。 3. **路由(Routing)**: 基于路由键将消息发送到指定...
1. **Work Queues**: 用于负载均衡,多个消费者可以从同一个队列中获取任务进行处理,防止同一任务被重复处理。 2. **Publish/Subscribe**: 通过主题交换机实现广播模式,所有订阅特定主题的消费者都会收到消息。 ...
在实际应用中,RabbitMQ能够支持多种消息模式和交换类型,包括工作队列(Work queues)、发布/订阅(Publish/Subscribe)、路由(Routing)、Topics类型交换机和远程过程调用(RPC)等,这些模式在RabbitMQ中通过...
兔子MQ RabbitMQ实践包括6种做法,例如01:Hello World 02:Work Queues 03:Publish Subscribe 04:Routing 05:Topics 06:RPC
1. **工作队列(Work Queues)**:用于分配大量后台任务,多个消费者可以同时处理来自同一队列的任务,提高效率。 2. **发布/订阅(Publish/Subscribe)**:消息广播模式,所有订阅特定主题的消费者都会收到消息。 ...
- **Work Queues**(也称为Load Balancing Queues):多个消费者从同一个队列中获取任务,实现任务的并发处理,常用于处理大量CPU密集型或IO密集型的任务。 - **Publish/Subscribe**:发布者将消息发送到主题交换机...
7. **工作模式(Work Queues)**:一种常见的使用模式,其中多个消费者并行处理来自单一队列的消息,用于负载均衡和任务处理。 为了管理和监控RabbitMQ,我们可以使用RabbitMQ的Web管理界面。默认情况下,该界面在...
1. **Work Queues**: 并发处理任务,多个消费者从同一队列中获取任务,提高效率。 2. **Publish/Subscribe模式**: 使用Fanout交换器实现广播消息,所有订阅者都能收到消息。 3. **死信队列**: 错误处理,消息无法...
RabbitMQ,作为一款开源的消息队列系统,...总的来说,这个源代码实例为学习和掌握RabbitMQ提供了一个很好的起点,不仅能够帮助你理解RabbitMQ的核心概念,还能让你在实践中运用这些知识,提高你的分布式系统设计能力。
在IT行业中,消息队列(Message Queue)是分布式系统中重要的中间件,它扮演着解耦、异步处理以及负载均衡的角色。...在实践中,不断学习和探索RabbitMQ的更多功能,将有助于提升整个系统的效率和可靠性。
2. **工作模式(Work Queues)**:也称为FIFO(先进先出)模式,用于分发任务到多个工作者,确保任务被逐一处理,不会同时处理相同任务。 3. **消息确认模式(Message Acknowledgement)**:在RabbitMQ中,消息的...
RabbitMQ 支持多种消息发布模式,包括简单模式(Simple)、工作队列模式(Work Queues)、发布/订阅模式(Publish/Subscribe)、路由模式(Routing)、主题模式(Topics)等。 #### 二、安装RabbitMQ ##### 2.1 ...
除了基础的发布/消费模型,RabbitMQ还支持许多高级特性,如工作队列(Work Queues)、发布/确认(Publish/Confirm)、死信队列(Dead Letter Queues)和延迟队列(Delayed Queues)。这些特性可以帮助构建更健壮和...
在本"rabbitmq简单demo"中,我们将探讨RabbitMQ的三种主要工作模式:Work Queues、Publish/Subscribe和Routing模式。 ### 1. Work Queues(工作队列) 工作队列模式是最基本的RabbitMQ用法,常用于负载均衡和异步...
2. **Work Queues(工作队列)**:也称为公平分发,用于多消费者场景,确保每个消费者平均分配任务,避免某一个消费者负载过高。 3. **Publish/Subscribe(发布/订阅)**:生产者发布消息到主题(Topic),消费者...
安装完成后,确保 RabbitMQ 服务已经启动,并创建你需要的交换机(exchanges)和队列(queues)。 **二、安装 rabbitmq-laravel** 在 Laravel 项目中,你可以使用 Composer 来安装 `rabbitmq-laravel` 包: ```...
除了基本的发送和接收消息之外,RabbitMQ 还支持许多高级特性,如工作队列(Work Queues)、发布/订阅模式(Publish/Subscribe)、主题路由(Topic Routing)等。在 "rabbitmqdemo" 中,你可能能够发现这些模式的...
- 缺点:学习曲线较陡峭。 **19. RabbitMQ适合哪些场景?** - **异步处理:** 任务排队、日志处理等。 - **微服务间的通信:** 提供服务间的消息传递机制。 - **消息缓存:** 减轻数据库的压力。 **20. 如何处理...