`
zhchx0827
  • 浏览: 194142 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

RabbitMQ入门学习——Work queues 工作队列

 
阅读更多

 

http://www.rabbitmq.com/tutorials/tutorial-two-java.html

在第一个教程中,我们通过一个命名队列来发送消息和接受消息。在这一节,我们将创建一个工作队列,在多个工作者之间,分发比较耗时的任务

工作队列主要是为了避免资源密集型任务的立即执行,然后一直等待它执行结束。相反,我们可以安排好任务,然后在执行。我们可以将一个任务封装成一个消息,发送到队列中。由工作者在后台取出任务然后执行。当有多个工作者时,他们共同处理这些任务。

web应用中,当一次http请求需要处理复杂的任务时,工作队列将会变得非常有用

 

1Preparation

     在前面的教程中,我们发送一条消息“Hello World!”,现在我们会发送一条字符串,来模拟一个复杂的任务。我们没有像图片的大小调整或者pdf文件的渲染这类很耗时的任务,所以我们通过Thread.sleep()来模拟耗时任务。我们通过字符串中的点来模拟复杂度,每个点将代表一个耗时1秒的工作。例如:任务“Hello. . .”将耗时3秒。

      我们会略微的修改前面的Send.java代码,使其可以接受控制端输入的随意的字符串。程序会将任务安排到我们的工作队列中,所以我们命名为NewTask.java

String message = getMessage(argv);

 

channel.basicPublish("", "hello", null, message.getBytes());

System.out.println(" [x] Sent '" + message + "'");

从客户端接受消息

private static String getMessage(String[] strings){

    if (strings.length < 1)

        return "Hello World!";

    return joinStrings(strings, " ");

}

 

private static String joinStrings(String[] strings, String delimiter) {

    int length = strings.length;

    if (length == 0) return "";

    StringBuilder words = new StringBuilder(strings[0]);

    for (int i = 1; i < length; i++) {

        words.append(delimiter).append(strings[i]);

    }

    return words.toString();

}

     我们的Recv.java代码也需要一些变化:它需要假设字符串中得每个点,都需要耗时1秒。它会从消息队列中弹出一条消息,然后执行,所以我们叫他Work.java

while (true) {

    QueueingConsumer.Delivery delivery = consumer.nextDelivery();

    String message = new String(delivery.getBody());

 

    System.out.println(" [x] Received '" + message + "'");        

    doWork(message);

    System.out.println(" [x] Done");

}

我们模拟执行的任务

private static void doWork(String task) throws InterruptedException {

    for (char ch: task.toCharArray()) {

        if (ch == '.') Thread.sleep(1000);

    }

}

2Round-robin dispatching(循环调度)

    使用工作队列的一个优点就是可以并行的执行。如果我们需要处理积压的工作,可以通过这种方式添加更多的工作者,并且很容易扩展

首先,同时先运行两个Worker.java代码,它们都将从消息队列获取消息。我们需要开启3个控制端窗口。两个运行Worker.java代码,它们作为消费者

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

 

shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

第三个窗口,我们会发布任务。一旦启动了consumer消费者之后,你就可以发布一些消息:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask First message.

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Second message..

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Third message...

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Fourth message....

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

NewTask Fifth message.....

让我们看看我们的工作者接受到了那些信息:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

 [x] Received 'First message.'

 [x] Received 'Third message...'

 [x] Received 'Fifth message.....'

 

java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar

Worker

 [*] Waiting for messages. To exit press CTRL+C

 [x] Received 'Second message..'

 [x] Received 'Fourth message....'

默认情况下,RabbitMQ会以此发送消息给下一个消费者。平均下来,每个消费者都会获得相同数量的消息。这种分发消息的方式就叫做Round-robin dispatching(循环调度)

4Message acknowledgment(消息确认)

     一个任务可能花费几秒种的之间,当一个消费者开始执行一个耗时的任务,并且只执行了任务的一部分就死亡时,你或许希望知道究竟发生了什么。通过我们目前的代码,一旦RabbitMQ向向消费者传递消息,它会立即从内存中将其删除。在这种情况下,如果你杀死了一个worker,我们将会失去它正在处理的消息。我们同样会失去所有交给它,但还没来得及处理的消息。但是我们不希望丢失任何的tasks。如果一个worker死亡,我们希望该task被交给另一个worker来处理。

     为了确保不丢失任何的messageRabbitMQ支持消息的确认。消费者会向RabbitMQ发送ack,来告诉RabbitMQ它已经接收,处理消息,RabbitMQ可以将其删除。

如果消费者没有发送ack就已经死亡,RabbitMQ会将其理解为消息没有被正确的处理,会将其从新发送给另一个消费者。通过这种方式,可以确保即便是worker偶然的死亡,也不会丢失任何消息。

不存在消息的超时。只有在worker的连接被关闭时,RabbitMQ才会将消息从新发送给下一个worker。即便是处理一个任务需要花费很长很长的时间,他也会正常的执行,而不会从新发送消息。

消息确认默认是开启的。在前面的例子中,我们通过autoAck=true,将其关闭。当我们执行一个task时,需要开启该功能,使worker可以发送一个正确的消息确认。

QueueingConsumer consumer = new QueueingConsumer(channel);

boolean autoAck = false;

channel.basicConsume("hello", autoAck, consumer);

 

while (true) {

  QueueingConsumer.Delivery delivery = consumer.nextDelivery();

  //...      

  channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

}

使用这样的代码,可以保证当worker正在处理一条消息时,强行将其关闭,也不会丢失任何的消息。在worker死亡后,所有未被确认的消息都将被从新发送。


5Message durability(消息持久化)

我们已经学习了如何确保消费者死亡,任务也不会丢失。但是一旦我们的RabbitMQ服务停止,任务还是同样会丢失。

如果你不告诉RabbitMQ,当它退出或发生灾难关闭时,它会丢失队列里的消息。我们需要做两件事情来确保消息不会丢失:我们需要同时保证队列和消息时持久的。

    首先:我们需要保证RabbitMQ不会丢失我们的队列。为了实现这个目的,我们需要将队列申明为永久的:

boolean durable = true;

channel.queueDeclare("hello", durable, false, false, null);

    即便命令执行正常,在我们现有的程序中,它也不会起作用。那是因为我们已经存在一个非永久的队列“hello”。RabbitMQ不允许我们使用不同的参数来定义一个已经存在的队列,它会向我们返回一些错误信息。但是,通过使用不同的名字(egtask_queue),来定义一个新的队列,可以快速的解决这个问题。

boolean durable = true;

channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare方法申明队列的变动,需要保证同时被应用在生产者和消费者两方

通过这种方式,我们可以确保task_queue队列在RabbitMQ服务重启后也不会丢失。现在我们需要把我们的消息也标记为永久的。通过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN可以实现这一目的。

channel.basicPublish("","task_queue",

        MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

消息持久化注意事项

即便我们将消息标记为永久的,但这也不能完全保证消息就一定不会丢失。虽然RabbitMQ会将消息保存到硬盘上,但是在RabbitMQ接收到消息并且将其保存到硬盘上之间,仍然有一个短暂的时间片段。

 

6Fair dispatch (公平调度)

     可能你已经注意到了,分发工作并没有按照我们预期的方式来执行。例如:当有两个工作者时,所有的奇数任务都比较繁重,偶数任务都比较简单。这样就导致了一个工作者会一直处于忙碌状态,而另一个工作者几乎没有事情可做。RabbitMQ对于这些情况并不了解,它还会均匀的分配任务。

发生这样的原因是因为一旦有消息进入队列,RabbitMQ就会将消息进行分发。他不会去观察消费者发回的确认信息。它会盲目的将n条消息分发给n个消费者。为了杜绝这种情况,我们可以通过basicQos方法设置prefetchCount = 1。它会限制RabbitMQ在同一时间向一个工作者发送一条以上的消息。换句话说,它会等待一个工作者处理完或者接收到上一条消息的确认信息,才向它发送新的消息。否者,它会将其分发给下一个比较空闲的工作者。

int prefetchCount = 1;

channel.basicQos(prefetchCount);

注意:如果所有的工作者都处于忙碌的状态,你的队列可能会被填满,你需要时刻注意。或许你可以添加更多的工作者或者采用其他的策略。

 

7Putting it all together(完整代码)

NewTask.java

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.MessageProperties;

 

public class NewTask {

 

  private static final String TASK_QUEUE_NAME = "task_queue";

 

  public static void main(String[] argv) 

                      throws java.io.IOException {

 

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("localhost");

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

 

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

 

    String message = getMessage(argv);

 

    channel.basicPublish( "", TASK_QUEUE_NAME, 

            MessageProperties.PERSISTENT_TEXT_PLAIN,

            message.getBytes());

    System.out.println(" [x] Sent '" + message + "'");

 

    channel.close();

    connection.close();

  }      

  //...

}

Work.java

import java.io.IOException;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.QueueingConsumer;

 

public class Worker {

 

  private static final String TASK_QUEUE_NAME = "task_queue";

 

  public static void main(String[] argv)

                      throws java.io.IOException,

                      java.lang.InterruptedException {

 

    ConnectionFactory factory = new ConnectionFactory();

    factory.setHost("localhost");

    Connection connection = factory.newConnection();

    Channel channel = connection.createChannel();

 

    channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

 

    channel.basicQos(1);

 

    QueueingConsumer consumer = new QueueingConsumer(channel);

    channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

 

    while (true) {

      QueueingConsumer.Delivery delivery = consumer.nextDelivery();

      String message = new String(delivery.getBody());

 

      System.out.println(" [x] Received '" + message + "'");   

      doWork(message); 

      System.out.println(" [x] Done" );

 

      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

    }

  }

  //...

}

 

  • 大小: 4.7 KB
分享到:
评论
2 楼 小灯笼 2018-01-15  
分布式消息队列高效部署及插件集群开发信息数据监控、分析实战(RabbitMQ、分布式、ZooKeeper、集群、监控、rabbitmq)
网盘地址1:https://pan.baidu.com/s/1smhdUH3 密码: w6vq
网盘地址2:https://pan.baidu.com/s/1kVUyLef 密码: 5pmi
1 楼 快乐的小六 2017-12-14  
分布式消息队列高效部署及插件集群开发信息数据监控、分析实战(RabbitMQ、分布式、ZooKeeper、集群、监控、rabbitmq)
网盘地址:https://pan.baidu.com/s/1qY7g9sg 密码: apuk

相关推荐

    rabbitMQ简单应用pattern1 Work queues 工作队列模式

    在本文中,我们将深入探讨RabbitMQ中的"Work Queues"(工作队列模式),这是一种优化任务处理效率的重要策略。 工作队列模式,也称为负荷分发或批处理队列,其核心思想是通过将大量任务分解为小型可处理单元,然后...

    rabbitmq 7种队列实现java版

    文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者...

    rabbitMQ 消息队列 Demo

    2. **02WorkQueues(任务队列)** 在这个示例中,多个消费者可以并发地从队列中获取任务进行处理,实现负载均衡。例如,处理一个耗时的任务时,如果有多台服务器(消费者),RabbitMQ会将任务分发给空闲的服务器,...

    RabbitMq

    work queues 工作队列 公平分发 轮询分发 public/subscribe 发布订阅 routing 路由选择 通配符模式 topics 主题 手动和自动确认消息 队列的持久化和非持久化 RabbitMq的延迟队列 在官网教程中,描述了六类工作队列...

    rabbitMQ工作队列共12页.pdf.zip

    在本篇文档中,我们将深入探讨RabbitMQ的工作队列(Work Queues)模式,该模式是其核心特性之一,常用于处理大量并发任务。 首先,让我们理解什么是工作队列。工作队列,又称为负载均衡队列,旨在将大量任务分发到...

    RabbitMQ实战 高效部署分布式消息队列

    1. 工作队列(Work Queues):通过多个消费者并行处理任务,提高系统性能。 2. 消息持久化:通过持久化消息和持久化队列,即使服务器重启,消息也不会丢失。 3. 优先级:队列可以设置消息优先级,优先级高的消息优先...

    rabbitmqdemo

    除了基本的发送和接收消息之外,RabbitMQ 还支持许多高级特性,如工作队列(Work Queues)、发布/订阅模式(Publish/Subscribe)、主题路由(Topic Routing)等。在 "rabbitmqdemo" 中,你可能能够发现这些模式的...

    rabbitMQ 源代码实例

    8. **Work Queues**:工作队列用于负载均衡,多个消费者可以从队列中取出任务并并行处理。 9. **Message Acknowledgments**:为了确保消息的可靠性,RabbitMQ提供了消息确认机制。消费者在成功处理消息后发送确认,...

    SpringBoot与RabbitMQ整合和对五种队列模式的实现源码

    5. **工作队列模式(Work Queues)**:也称为FIFO(先进先出),多个消费者共享一个队列,但每个消息仅由一个消费者处理,防止同一消息被多次处理。 在SpringBoot中实现这些模式,主要通过配置不同的交换器...

    RabbitMQ课程1

    课程进一步探讨了RabbitMQ的深入特性,如工作队列(Work Queues),它用于分配任务给多个工作者,确保高并发和负载均衡。此外,还讲解了消息的持久化、公平转发、发布/订阅模式、路由选择和主题转发等高级主题。 在...

    RabbitMQ的五种模式源码+纯手工打的代码

    2. **工作队列(Work Queues)**: 也称为任务队列,多个消费者从同一个队列中获取任务进行处理,可以有效平衡工作负载,避免某个消费者过载。例如,网站后台的图片处理、数据计算等耗时操作可以放入工作队列。 3....

    RabbitMQ高级消息队列协议.rar

    3. **工作队列(Work Queues)**:用于分配工作负载,多个工作者可以并行处理队列中的任务,提高了系统的处理能力。 4. **延迟队列(Delayed Queue)**:允许消息在指定的时间后才被消费,适用于定时任务。 5. **...

    rabbitMQ安装包.zip

    7. **工作模式(Work Queues)**:一种常见的使用模式,其中多个消费者并行处理来自单一队列的消息,用于负载均衡和任务处理。 为了管理和监控RabbitMQ,我们可以使用RabbitMQ的Web管理界面。默认情况下,该界面在...

    rabbitmq简单demo

    在本"rabbitmq简单demo"中,我们将探讨RabbitMQ的三种主要工作模式:Work Queues、Publish/Subscribe和Routing模式。 ### 1. Work Queues(工作队列) 工作队列模式是最基本的RabbitMQ用法,常用于负载均衡和异步...

    Spring AMQP 工作队列 源码

    通过对 Spring AMQP 工作队列源码的深入学习,可以更好地理解其内部机制,从而优化消息处理性能,提高系统的可靠性和可扩展性。这包括但不限于如何有效地配置和使用交换器、队列、绑定,以及如何设计健壮的消费者...

    rabbitmq C# 调用测试

    除了基础的发布/消费模型,RabbitMQ还支持许多高级特性,如工作队列(Work Queues)、发布/确认(Publish/Confirm)、死信队列(Dead Letter Queues)和延迟队列(Delayed Queues)。这些特性可以帮助构建更健壮和...

    RabbitMQ Server3.13.0

    1. **Work Queues**: 用于负载均衡,多个消费者可以从同一个队列中获取任务进行处理,防止同一任务被重复处理。 2. **Publish/Subscribe**: 通过主题交换机实现广播模式,所有订阅特定主题的消费者都会收到消息。 ...

    rabbitmq 学习资料

    - **Work Queues**(也称为Load Balancing Queues):多个消费者从同一个队列中获取任务,实现任务的并发处理,常用于处理大量CPU密集型或IO密集型的任务。 - **Publish/Subscribe**:发布者将消息发送到主题交换机...

    RabbitMQ.docx

    2. **Work Queues(工作队列)**:也称为公平分发,用于多消费者场景,确保每个消费者平均分配任务,避免某一个消费者负载过高。 3. **Publish/Subscribe(发布/订阅)**:生产者发布消息到主题(Topic),消费者...

    rabbitmq安装包

    1. **工作队列(Work Queues)**: 用于分配任务到多个工作者,以提高处理效率。 2. **发布/订阅(Publish/Subscribe)**: 让多个消费者可以同时接收消息。 3. **路由(Routing)**: 基于路由键将消息发送到指定...

Global site tag (gtag.js) - Google Analytics