本文章转发自http://my.oschina.net/OpenSourceBO/blog/379735
工作队列
(使用Java客户端)
在这第一指南部分,我们写了通过同一命名的队列发送和接受消息。在这一部分,我们将会创建一个工作队列,在多个工作者之间使用分布式时间任务。
工 作队列(亦称:任务队列)背后主要的思想是避免立即处理一个资源密集型任务并且不得不一直等待完成。相反我们可以计划着让任务后续执行。我们将任务封装成 消息,发送到队列中。一个工作者进程在后台运行,获取任务并最终执行任务。当你运行多个工作者,所有的任务将会被他们所共享。
在web应用程序中,这个理念是特别有用的,你无法在一个短暂的http请求中处理一个复杂的任务。
准备
在先前的指南中,我们发送了一个包含"Hello World!“消息。现在我们将要发送一些字符串,用来代表复杂的任务。我们没有一个真实的任务,比如图片的调整大小或者pdf文件渲染,所以我们通过Thread.sleep()
函数,伪装一个我们是很忙景象。我们将会把字符串中点的数量来代表它的复杂度;每一个点将要花费一秒的工作。例如,一个使用Hello...
描述的假任务会发送三秒。
我们将会轻量的修改我们以前例子中Send.java
代码,使其允许任意的消息可以通过命令行发出。这个程序将要计划安排任务到我们的工作队列中,所以我们把它命名为NewTask.java
:
String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
一些帮助从命令行中获取消息参数:
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
我们老的Recv.java
程序也要求做些改变:它需要将消息体中每个点伪装成一秒。从队列中获取消息,运行任务,所以我们将它称之为Worker.java
:
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");
}
我们伪装的任务中冒充执行时间:
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
在第一部分指南中那样编译它们(jar 文件需要再工作路径上):
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java
循环分派
使用任务队列的优势之一是我们是容易并行处理。如果我们正在处理一些堆积的文件的话,我们仅仅需要增加更多的工作者,通过这种方式我们是容易扩展的。
首先,让我们试着在同一时间运行两个工作者实例。他们都会从队列中获取消息,但是具体怎样做呢?让我们一起来看一看。
你需要三个打开的控制平台,其中两个用来运行工作者程序。他们将会是我们的两个消费者-C1和C2。
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
在这第三个控制平台我们用来发布新的任务。一旦你启动消费者,你就可以发布消息了:
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....
让我们看看什么被投递到我们工作者那里:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
默认情况想,RabbitMQ将会把每一个消息发送给下一个消费者。平均下来每个消费者获取的消息数量是相同的。这种分布式消息方式被称为轮询。试试三个或更多的工作者。
消息确认
处理一个任务可能花费数秒时间,你可能会好奇如果一个消费者开始一个长任务,并且在处理完成部分的情况下就死掉了会发生什么情况。就我们当前的代码 来说,一旦RabbitMQ将消息传递给消费者,它就会立即将消息从内存中删除。在这种情况下,如果你杀掉一个正在处理的工作者你会丢失它正在处理的消 息。我们也同时失去了已经分配给这个工作者并且没有开始处理的消息。
但是我们不想丢失任何任务,如果一个工作者死掉,我们期望将任务传递给另一个工作者。
为了保证每一个消息不会丢失,RabbitMQ支持消息确认机制。一个消息确认是由消费者发出,告诉RabbitMQ这个消息已经被接受,处理完成,RabbitMQ 可以删除它了。
如果一个消费者没有发送确认信号,RabbitMQ将会认定这个消息没有完全处理成功,将会把它传递给另一个消费者。通过这种方式,即使工作者有时会死掉,你依旧可以保证没有消息会被丢失。
这里不存在消息超时;RabbitMQ只会在工作者连接死掉才重新传递这个消息。即使一个消息要被处理很长很长时间,也不是问题。
消息确认机制默认情况下是开着的。在先前的例子中我们是明确的将这个功能关闭no_ack=True
。是时候移除这个标识了,一旦我们完成一个任务,工作者需要发送一个确认信号。
QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
使用这段代码,我们可以保证即使你将一个正在处理消息的工作者通过CTRL+C
来终止它运行,依旧没有消息会丢失。稍后,工作者死亡后没有发送确认的消息会被重新传递。
忘掉确认
这是一个普遍的错误,就是忘记确认。这是一个很简单的错误,但是这后果是严重的。当你的客户端退出,消息会重新传递(看上去是随机传递的),RabbitMQ会越来越占用内存,因为它不会释放哪些没有发送确认的消息。
为了调试这种类型的错误,你可以使用
rabbitmqctl
打印出messages_unacknowledged
属性:$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done.
消息持久化
我们已经学习了如何在确定消费者是否已经死掉,并且保证任务不被丢失。但是如果RabbitMQ服务器停止,我们的任务依旧会丢失。
当RabbitMQ退出或者崩溃,它将会忘记这队列和消息,除非你告诉它不要这样做。两个事情需要做来保证消息不会丢失:我们标记队列和消息持久化。
首先,我们需要确保RabbitMQ不会丢失我们的队列,为了这样做,我们需要将它声明为持久化:
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
虽然这命令是正确的,但它不会立即在我们的程序里运行。那是因为我们已经定义了一个不持久化的hello
队列。RabbitMQ不允许你使用不同的参数重新定义一个存在的队列,如果你试着那样做它会返回一个错误。有个快速的变通方案-让我们声明一个不同名字的队列,比如task_queue
:
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
这个queuqDeclare
的改变需要应用在生产者和消费者的代码中。
在这点上,我们可以保证即使RabbitMQ重启,task_queue
队列也不会丢失。现在我们需要标记消息持久化 - 通过设置MessageProperties
(实现了BasicProperties
)的值为PERSISTENT_TEXT_PLAIN
。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
注意消息持久化
标记消息持久化不能完全保证消息不会被丢失,虽然这样会告诉RabbitMQ保存消息到硬盘上。但是对于 RabbitMQ依旧有个短暂的时间窗口对于接收一个消息并且还没有完成保存。同样,RabbitMQ不能让每个消息同步–它可能仅仅保存在缓存中,还没 有真正的写入到硬盘中。这持久化的保证不是健壮的,但是对我们的简单的任务队列来说是足够了。如果你需要更健壮的持久化保证,你可以使用出版者确认。
公平分发
你可能注意到了,分发过程并没有如我们想的那样运作。例如,在一个情况下有两个工作者,当所有奇数消息是重的和所有偶数是轻的,一个工作者会一直忙碌下去,而另一个则会几乎不做什么事情。好吧,RabbitMQ不会在意那个事情,它会一直均匀的分发消息。
这种情况发生因为RabbitMQ仅仅分发消息到队列中。它不关心有多少消息没有由发送者发送确认信号。它仅仅盲目的将N个消息发送到N个消费者。
为了解决这个问题,我们可以使用basicQos
方法,设置prefetchCount=1
。这样将会告知RabbitMQ不要同时给一个工作者超过一个任务,或者换句话说在一个工作者处理完成,发送确认之前不要给它分发一个新的消息。代替,把消息分发到下一个不繁忙的工作者。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
注意队列大小
如果你的所有工作者是在忙碌,你的队列就会被填满。你将会想关注这件事,可能要添加更多的工作者,或者有些其他策略。
把它们放在一起
我们的NewTask.java
最终代码:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class NewTask {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
String message = getMessage(argv);
channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
(NewTask.java source)
我们的Worker.java
代码:
java.lang.InterruptedException {
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done" );
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
//...
}
(Worker.java source)
使用消息确认和预读数量你可以建立一个工作队列。持久化选项使得RabbitMQ重启之后任务依旧存在。
想要了解更多关于通道方法和消息属性,你可以浏览javadocs online
。
现在我们可以移到指南3了,学习怎么样将相同的消息传递给多个消费者
相关推荐
【标题】"MQ示例+otp_win64_22.2.exe+rabbitmq-server-3.8.1" 提供的是一个与消息队列相关...总的来说,这个压缩包是一个全面的入门资源,对于想要学习和实践消息队列技术,特别是 RabbitMQ 的开发者来说,非常有价值。
**RabbitMQ 入门与实战** RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息队列系统,它被广泛应用于分布式系统中的消息传递和任务调度。RabbitMQ 提供了高可用性、可扩展性和可靠性...
最近整理学习的RabbitMQ入门Dome,文件是一个普通java项目导入完成后在lib文件夹中amqp-client-5.2.0.jar,slf4j-api-1.7.25.jar添加进去即可,里面有5个dome分是 dome1 : 简单队列,dome2 :work模式,dome3 : 订阅...
接下来,"工作队列"章节深入讨论了RabbitMQ如何处理工作负载。工作队列是一种常见的设计模式,用于将大量的任务分发到多个工作进程,从而提高系统的并行处理能力。在这里,你将学习如何创建队列,以及如何使用消费者...
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...
2. **工作队列**(Work Queues):多个消费者,消息被分发给空闲的消费者,保证任务的并行处理。 3. **发布/订阅**:多个生产者和消费者,每个消费者可以订阅多个主题,广播模式。 4. **路由**:根据路由键将消息...
RabbitMQ是一个开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中,用于实现应用之间的异步通信和解耦。在这个“RabbitMQ-Pub-Sub-Sample”项目中,我们将深入探讨...
【RabbitMQ 入门到精通】:RabbitMQ 是一款流行的消息中间件,它基于 AMQP(Advanced Message Queuing Protocol)协议实现,用于在分布式系统中高效地传输消息,从而实现异步处理、解耦和流量控制。本教程旨在帮助...
【标题】:“rabbitMQ入门” 在IT行业中,消息队列是一种常见的中间件技术,用于解耦应用程序组件,提高系统的可扩展性和可靠性。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中。本篇文章将带...
2. **轮询分发模式(Round Robin)**:在RabbitMQ中,当有多消费者连接到一个队列时,消息会按照轮询的方式平均分发给每个消费者,确保负载均衡。 3. **公平分发模式(Fair Dispatch)**:在默认情况下,RabbitMQ将...
通过分析`rabbitmq-producer`和`rabbitmq-consumer`的代码,你可以深入理解RabbitMQ的工作原理,并能熟练地在自己的项目中应用消息队列来提高系统的稳定性和性能。同时,这也将帮助你更好地理解和运用AMQP协议,为...
"rabbitmq-tutorial-php-demo" 指的是一个基于PHP的RabbitMQ入门教程的演示项目。RabbitMQ是一个流行的开源消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议来实现高效、可靠的消息传递...
### RabbitMQ从入门到放弃——理解消息队列与RabbitMQ #### 消息队列简介 消息队列(Message Queue, MQ)作为一种重要的中间件技术,它提供了应用程序间的一种通信方式,通过写入和读取出入列队的消息来进行通信,...
### RabbitMQ入门知识点详解 #### 一、RabbitMQ简介 RabbitMQ是一款开源的消息中间件,基于Erlang语言开发而成。它支持多种消息发布/订阅模式,并且能够跨多平台运行。作为消息中间件,RabbitMQ的核心功能是接受、...
2. **基本API使用**:学习Java、Python、JavaScript等语言的RabbitMQ客户端库,掌握创建连接、创建通道、声明交换器、声明队列、发送和接收消息的基本方法。 3. **消息发布/订阅模式**:理解生产者如何发布消息到...
3. **工作流程**:生产者将消息发送到指定的队列,消费者从队列中读取消息并执行相应的业务逻辑。 #### 五、案例分析:Work 模式下的商品数据同步 **Work 模式**:在这种模式下,一个生产者可以将消息发送到队列,...
RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于处理异步任务、解耦组件以及实现消息传递。本示例将带你逐步了解如何在你的项目中使用...
消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心概念,如何通过控制台进行管理,以及如何在Spring Cloud框架下创建消息生产者和消费者。 首先,让我们了解RabbitMQ的基本概念。RabbitMQ的...
### RabbitMQ 入门教程(JAVA) #### 一、RabbitMQ 概述 RabbitMQ 是一个消息中间件,其主要功能是接收来自生产者的消息,并根据规则将其路由、缓冲以及持久化后传递给消费者。RabbitMQ 和消息传递系统通常会使用...