http://www.rabbitmq.com/tutorials/tutorial-two-java.html
在第一个教程中,我们通过一个命名队列来发送消息和接受消息。在这一节,我们将创建一个工作队列,在多个工作者之间,分发比较耗时的任务
工作队列主要是为了避免资源密集型任务的立即执行,然后一直等待它执行结束。相反,我们可以安排好任务,然后在执行。我们可以将一个任务封装成一个消息,发送到队列中。由工作者在后台取出任务然后执行。当有多个工作者时,他们共同处理这些任务。
在web应用中,当一次http请求需要处理复杂的任务时,工作队列将会变得非常有用
1:Preparation
在前面的教程中,我们发送一条消息“Hello World!”,现在我们会发送一条字符串,来模拟一个复杂的任务。我们没有像图片的大小调整或者pdf文件的渲染这类很耗时的任务,所以我们通过Thread.sleep()来模拟耗时任务。我们通过字符串中的点来模拟复杂度,每个点将代表一个耗时1秒的工作。例如:任务“Hello. . .”将耗时3秒。
我们会略微的修改前面的Send.java代码,使其可以接受控制端输入的随意的字符串。程序会将任务安排到我们的工作队列中,所以我们命名为NewTask.java
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
从客户端接受消息
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
我们的Recv.java代码也需要一些变化:它需要假设字符串中得每个点,都需要耗时1秒。它会从消息队列中弹出一条消息,然后执行,所以我们叫他Work.java
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
}
我们模拟执行的任务
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
2:Round-robin dispatching(循环调度)
使用工作队列的一个优点就是可以并行的执行。如果我们需要处理积压的工作,可以通过这种方式添加更多的工作者,并且很容易扩展
首先,同时先运行两个Worker.java代码,它们都将从消息队列获取消息。我们需要开启3个控制端窗口。两个运行Worker.java代码,它们作为消费者
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
第三个窗口,我们会发布任务。一旦启动了consumer消费者之后,你就可以发布一些消息:
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....
让我们看看我们的工作者接受到了那些信息:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
默认情况下,RabbitMQ会以此发送消息给下一个消费者。平均下来,每个消费者都会获得相同数量的消息。这种分发消息的方式就叫做Round-robin dispatching(循环调度)。
4:Message acknowledgment(消息确认)
一个任务可能花费几秒种的之间,当一个消费者开始执行一个耗时的任务,并且只执行了任务的一部分就死亡时,你或许希望知道究竟发生了什么。通过我们目前的代码,一旦RabbitMQ向向消费者传递消息,它会立即从内存中将其删除。在这种情况下,如果你杀死了一个worker,我们将会失去它正在处理的消息。我们同样会失去所有交给它,但还没来得及处理的消息。但是我们不希望丢失任何的tasks。如果一个worker死亡,我们希望该task被交给另一个worker来处理。
为了确保不丢失任何的message,RabbitMQ支持消息的确认。消费者会向RabbitMQ发送ack,来告诉RabbitMQ它已经接收,处理消息,RabbitMQ可以将其删除。
如果消费者没有发送ack就已经死亡,RabbitMQ会将其理解为消息没有被正确的处理,会将其从新发送给另一个消费者。通过这种方式,可以确保即便是worker偶然的死亡,也不会丢失任何消息。
不存在消息的超时。只有在worker的连接被关闭时,RabbitMQ才会将消息从新发送给下一个worker。即便是处理一个任务需要花费很长很长的时间,他也会正常的执行,而不会从新发送消息。
消息确认默认是开启的。在前面的例子中,我们通过autoAck=true,将其关闭。当我们执行一个task时,需要开启该功能,使worker可以发送一个正确的消息确认。
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
使用这样的代码,可以保证当worker正在处理一条消息时,强行将其关闭,也不会丢失任何的消息。在worker死亡后,所有未被确认的消息都将被从新发送。
5:Message durability(消息持久化)
我们已经学习了如何确保消费者死亡,任务也不会丢失。但是一旦我们的RabbitMQ服务停止,任务还是同样会丢失。
如果你不告诉RabbitMQ,当它退出或发生灾难关闭时,它会丢失队列里的消息。我们需要做两件事情来确保消息不会丢失:我们需要同时保证队列和消息时持久的。
首先:我们需要保证RabbitMQ不会丢失我们的队列。为了实现这个目的,我们需要将队列申明为永久的:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
即便命令执行正常,在我们现有的程序中,它也不会起作用。那是因为我们已经存在一个非永久的队列“hello”。RabbitMQ不允许我们使用不同的参数来定义一个已经存在的队列,它会向我们返回一些错误信息。但是,通过使用不同的名字(eg:task_queue),来定义一个新的队列,可以快速的解决这个问题。
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
queueDeclare方法申明队列的变动,需要保证同时被应用在生产者和消费者两方
通过这种方式,我们可以确保task_queue队列在RabbitMQ服务重启后也不会丢失。现在我们需要把我们的消息也标记为永久的。通过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN可以实现这一目的。
channel.basicPublish("","task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消息持久化注意事项
即便我们将消息标记为永久的,但这也不能完全保证消息就一定不会丢失。虽然RabbitMQ会将消息保存到硬盘上,但是在RabbitMQ接收到消息并且将其保存到硬盘上之间,仍然有一个短暂的时间片段。
6:Fair dispatch (公平调度)
可能你已经注意到了,分发工作并没有按照我们预期的方式来执行。例如:当有两个工作者时,所有的奇数任务都比较繁重,偶数任务都比较简单。这样就导致了一个工作者会一直处于忙碌状态,而另一个工作者几乎没有事情可做。RabbitMQ对于这些情况并不了解,它还会均匀的分配任务。
发生这样的原因是因为一旦有消息进入队列,RabbitMQ就会将消息进行分发。他不会去观察消费者发回的确认信息。它会盲目的将n条消息分发给n个消费者。为了杜绝这种情况,我们可以通过basicQos方法设置prefetchCount = 1。它会限制RabbitMQ在同一时间向一个工作者发送一条以上的消息。换句话说,它会等待一个工作者处理完或者接收到上一条消息的确认信息,才向它发送新的消息。否者,它会将其分发给下一个比较空闲的工作者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意:如果所有的工作者都处于忙碌的状态,你的队列可能会被填满,你需要时刻注意。或许你可以添加更多的工作者或者采用其他的策略。
7:Putting it all together(完整代码)
NewTask.java
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
Work.java
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done" );
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}
相关推荐
在本文中,我们将深入探讨RabbitMQ中的"Work Queues"(工作队列模式),这是一种优化任务处理效率的重要策略。 工作队列模式,也称为负荷分发或批处理队列,其核心思想是通过将大量任务分解为小型可处理单元,然后...
文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者...
2. **02WorkQueues(任务队列)** 在这个示例中,多个消费者可以并发地从队列中获取任务进行处理,实现负载均衡。例如,处理一个耗时的任务时,如果有多台服务器(消费者),RabbitMQ会将任务分发给空闲的服务器,...
work queues 工作队列 公平分发 轮询分发 public/subscribe 发布订阅 routing 路由选择 通配符模式 topics 主题 手动和自动确认消息 队列的持久化和非持久化 RabbitMq的延迟队列 在官网教程中,描述了六类工作队列...
在本篇文档中,我们将深入探讨RabbitMQ的工作队列(Work Queues)模式,该模式是其核心特性之一,常用于处理大量并发任务。 首先,让我们理解什么是工作队列。工作队列,又称为负载均衡队列,旨在将大量任务分发到...
1. 工作队列(Work Queues):通过多个消费者并行处理任务,提高系统性能。 2. 消息持久化:通过持久化消息和持久化队列,即使服务器重启,消息也不会丢失。 3. 优先级:队列可以设置消息优先级,优先级高的消息优先...
除了基本的发送和接收消息之外,RabbitMQ 还支持许多高级特性,如工作队列(Work Queues)、发布/订阅模式(Publish/Subscribe)、主题路由(Topic Routing)等。在 "rabbitmqdemo" 中,你可能能够发现这些模式的...
8. **Work Queues**:工作队列用于负载均衡,多个消费者可以从队列中取出任务并并行处理。 9. **Message Acknowledgments**:为了确保消息的可靠性,RabbitMQ提供了消息确认机制。消费者在成功处理消息后发送确认,...
5. **工作队列模式(Work Queues)**:也称为FIFO(先进先出),多个消费者共享一个队列,但每个消息仅由一个消费者处理,防止同一消息被多次处理。 在SpringBoot中实现这些模式,主要通过配置不同的交换器...
课程进一步探讨了RabbitMQ的深入特性,如工作队列(Work Queues),它用于分配任务给多个工作者,确保高并发和负载均衡。此外,还讲解了消息的持久化、公平转发、发布/订阅模式、路由选择和主题转发等高级主题。 在...
2. **工作队列(Work Queues)**: 也称为任务队列,多个消费者从同一个队列中获取任务进行处理,可以有效平衡工作负载,避免某个消费者过载。例如,网站后台的图片处理、数据计算等耗时操作可以放入工作队列。 3....
3. **工作队列(Work Queues)**:用于分配工作负载,多个工作者可以并行处理队列中的任务,提高了系统的处理能力。 4. **延迟队列(Delayed Queue)**:允许消息在指定的时间后才被消费,适用于定时任务。 5. **...
7. **工作模式(Work Queues)**:一种常见的使用模式,其中多个消费者并行处理来自单一队列的消息,用于负载均衡和任务处理。 为了管理和监控RabbitMQ,我们可以使用RabbitMQ的Web管理界面。默认情况下,该界面在...
在本"rabbitmq简单demo"中,我们将探讨RabbitMQ的三种主要工作模式:Work Queues、Publish/Subscribe和Routing模式。 ### 1. Work Queues(工作队列) 工作队列模式是最基本的RabbitMQ用法,常用于负载均衡和异步...
通过对 Spring AMQP 工作队列源码的深入学习,可以更好地理解其内部机制,从而优化消息处理性能,提高系统的可靠性和可扩展性。这包括但不限于如何有效地配置和使用交换器、队列、绑定,以及如何设计健壮的消费者...
1. **Work Queues**: 用于负载均衡,多个消费者可以从同一个队列中获取任务进行处理,防止同一任务被重复处理。 2. **Publish/Subscribe**: 通过主题交换机实现广播模式,所有订阅特定主题的消费者都会收到消息。 ...
除了基础的发布/消费模型,RabbitMQ还支持许多高级特性,如工作队列(Work Queues)、发布/确认(Publish/Confirm)、死信队列(Dead Letter Queues)和延迟队列(Delayed Queues)。这些特性可以帮助构建更健壮和...
- **Work Queues**(也称为Load Balancing Queues):多个消费者从同一个队列中获取任务,实现任务的并发处理,常用于处理大量CPU密集型或IO密集型的任务。 - **Publish/Subscribe**:发布者将消息发送到主题交换机...
2. **Work Queues(工作队列)**:也称为公平分发,用于多消费者场景,确保每个消费者平均分配任务,避免某一个消费者负载过高。 3. **Publish/Subscribe(发布/订阅)**:生产者发布消息到主题(Topic),消费者...
1. **工作队列(Work Queues)**: 用于分配任务到多个工作者,以提高处理效率。 2. **发布/订阅(Publish/Subscribe)**: 让多个消费者可以同时接收消息。 3. **路由(Routing)**: 基于路由键将消息发送到指定...