`
yugouai
  • 浏览: 499242 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

rabbitmq入门-工作队列

阅读更多

工作队列:为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。

 

消费者1输出

 

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hi hi. hi.. hi...1'
 [x] Done
 [x] Received 'hi hi. hi.. hi...3'
 [x] Done
 [x] Received 'hi hi. hi.. hi...5'
 [x] Done
 [x] Received 'hi hi. hi.. hi...7'
 [x] Done
 [x] Received 'hi hi. hi.. hi...9'
 [x] Done

 消费者2输出

 

 

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hi hi. hi.. hi...0'
 [x] Done
 [x] Received 'hi hi. hi.. hi...2'
 [x] Done
 [x] Received 'hi hi. hi.. hi...4'
 [x] Done
 [x] Received 'hi hi. hi.. hi...6'
 [x] Done
 [x] Received 'hi hi. hi.. hi...8'
 [x] Done

 默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。

 

生产者代码

 

package com.duowan.rabbit.mq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MultiMQClient {
	private final static String QUEUE_NAME="hello";
    public static void main( String[] args ) throws IOException
    {
       ConnectionFactory factory = new ConnectionFactory();
       factory.setHost("localhost");
       Connection connection = factory.newConnection();
       Channel channel = connection.createChannel();
       channel.queueDeclare(QUEUE_NAME, false, false, false, null);
       args = new String[]{"hi","hi.","hi..","hi..."};
       for (int i = 0; i < 10; i++) {
    	   String message = getMessage(args)+i;
    	   channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    	   System.out.println(" [x] Sent '" + message + "'");
       }
       
       channel.close();
       connection.close();
    }
    
    private static String getMessage(String[] strings){
        if (strings.length < 1)
            return "Hello World!";
        return joinStrings(strings, " ");
    }
    
    private static String joinStrings(String[] strings, String delimiter) {
        int length = strings.length;
        if (length == 0) return "";
        StringBuilder words = new StringBuilder(strings[0]);
        for (int i = 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

 消费者代码

package com.duowan.rabbit.mq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class MultiMQServer {
	private final static String QUEUE_NAME = "hello";
	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.queueDeclare(QUEUE_NAME, false, false, false, null);
	    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
	    QueueingConsumer consumer = new QueueingConsumer(channel);
	    boolean autoAck = false;
	    channel.basicConsume(QUEUE_NAME, autoAck, consumer);

	    while (true) {
	      QueueingConsumer.Delivery delivery = consumer.nextDelivery();
	      String message = new String(delivery.getBody());
	      //确认消息已经收到
	      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
	      System.out.println(" [x] Received '" + message + "'");
	      doWork(message);
	      System.out.println(" [x] Done");
	    }
		
	}
	private static void doWork(String message) throws InterruptedException {
		for (char ch : message.toCharArray()) {
			if (ch == '.') {
				Thread.sleep(1000);
			}
		}
	}
}

 

 

消息响应

我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

 

为了防止消息丢失,RabbitMQ提供了消息[i]响应(acknowledgments)[/i]。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

 

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。

 

消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。

 

消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

 

设置消息响应

 

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

 运行两个消费者进程,运行生产者,关闭其中一个消费者进程,输出如下:

 

 

 [*] Waiting for messages. To exit press CTRL+C
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hi hi. hi.. hi...0'
 [x] Done
 [x] Received 'hi hi. hi.. hi...2'

 运行到此,kill掉进程

 

 

[*] Waiting for messages. To exit press CTRL+C
 [x] Received 'hi hi. hi.. hi...1'
 [x] Done
 [x] Received 'hi hi. hi.. hi...3'
 [x] Done
 [x] Received 'hi hi. hi.. hi...5'
 [x] Done
 [x] Received 'hi hi. hi.. hi...7'
 [x] Done
 [x] Received 'hi hi. hi.. hi...9'
 [x] Done
 [x] Received 'hi hi. hi.. hi...0'
 [x] Done
 [x] Received 'hi hi. hi.. hi...2'
 [x] Done
 [x] Received 'hi hi. hi.. hi...4'
 [x] Done
 [x] Received 'hi hi. hi.. hi...6'
 [x] Done
 [x] Received 'hi hi. hi.. hi...8'
 [x] Done

 运行上面的代码,我们发现即使使用CTRL+C杀掉了一个工作者(worker)进程,消息也不会丢失。当工作者(worker)挂掉这后,所有没有响应的消息都会重新发送。

 

 

消息持久化

那么在它退出或者崩溃的时候,它将会流失所有的队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

 

boolean durable = true;
channel.queueDeclare("hello", durable , false, false, null);

 尽管这行代码本身是正确的,但是仍然不会正确运行。因为我们已经定义过一个叫hello的非持久化队列。RabbitMq不允许你使用不同的参数重新定义一个队列,它会返回一个错误。但我们现在使用一个快捷的解决方法——用不同的名字,例如task_queue

 

 

boolean durable = true;
channel.queueDeclare("task_queue", durable , false, false, null);

 这个queue_declare必须在生产者(producer)和消费者(consumer)对应的代码中修改。

 

这时候,我们就可以确保在RabbitMq重启之后queue_declare队列不会丢失。另外,我们需要把我们的消息也要设为持久化——将delivery_mode的属性设为PERSISTENT_TEXT_PLAIN。

channel.basicPublish("", "task_queue", 
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());

 

注意:

将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务(transaction)。

 

公平分发

以上代码,rabbitmq没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。

这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。

 

我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。

channel.basicPublish("", "task_queue", 
int prefetchCount = 1;
channel.basicQos(prefetchCount);

 

分享到:
评论

相关推荐

    RabbitMQ入门-实战-RabbitMQ.zip

    **RabbitMQ 入门与实战** RabbitMQ 是一个基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息队列系统,它被广泛应用于分布式系统中的消息传递和任务调度。RabbitMQ 提供了高可用性、可扩展性和可靠性...

    MQ示例+otp_win64_22.2.exe+rabbitmq-server-3.8.1

    【标题】"MQ示例+otp_win64_22.2.exe+rabbitmq-server-3.8.1" 提供的是一个与消息队列相关...总的来说,这个压缩包是一个全面的入门资源,对于想要学习和实践消息队列技术,特别是 RabbitMQ 的开发者来说,非常有价值。

    RabbitMQ快速入门及API介绍(401M)

    RabbitMQ快速入门及API介绍(401M) QQ截图 20191220230107.png?x-oss-process=style/pnp8 (42.73KB, 下载次数:227) 下载附件 2019-12-2023 :01 上传【课程介绍】:第一章 : RabbitMQ介绍:消息中间件概念、RabbitMQ...

    RabbitMQ入门小Dome ------&amp;gt; RabbitMQDome.zip

    最近整理学习的RabbitMQ入门Dome,文件是一个普通java项目导入完成后在lib文件夹中amqp-client-5.2.0.jar,slf4j-api-1.7.25.jar添加进去即可,里面有5个dome分是 dome1 : 简单队列,dome2 :work模式,dome3 : 订阅...

    RabbitMQ-Day1-Code

    通过分析`rabbitmq-producer`和`rabbitmq-consumer`的代码,你可以深入理解RabbitMQ的工作原理,并能熟练地在自己的项目中应用消息队列来提高系统的稳定性和性能。同时,这也将帮助你更好地理解和运用AMQP协议,为...

    rabbitMq入门

    【标题】:“rabbitMQ入门” 在IT行业中,消息队列是一种常见的中间件技术,用于解耦应用程序组件,提高系统的可扩展性和可靠性。RabbitMQ是一个开源的消息代理和队列服务器,广泛应用于分布式系统中。本篇文章将带...

    Rabbitmq入门到精通.doc

    【RabbitMQ 入门到精通】:RabbitMQ 是一款流行的消息中间件,它基于 AMQP(Advanced Message Queuing Protocol)协议实现,用于在分布式系统中高效地传输消息,从而实现异步处理、解耦和流量控制。本教程旨在帮助...

    rabbitmq-demo入门代码

    RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中,用于处理异步任务、解耦组件以及实现消息传递。本示例将带你逐步了解如何在你的项目中使用...

    RabbitMQ从入门到放弃

    ### RabbitMQ从入门到放弃——理解消息队列与RabbitMQ #### 消息队列简介 消息队列(Message Queue, MQ)作为一种重要的中间件技术,它提供了应用程序间的一种通信方式,通过写入和读取出入列队的消息来进行通信,...

    rabbitMQ代码案例 简单入门

    4. **工作队列模式**:学习如何使用RabbitMQ实现任务调度,通过多个消费者并行处理任务以提高效率。 5. **路由与队列绑定**:掌握不同的交换器类型(如Direct、Fanout、Topic、Header)以及如何通过绑定规则实现...

    RabbitMQ-Pub-Sub-Sample:RabbitMQ入门

    RabbitMQ是一个开源的消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,广泛应用于分布式系统中,用于实现应用之间的异步通信和解耦。在这个“RabbitMQ-Pub-Sub-Sample”项目中,我们将深入探讨...

    Rabbitmq笔记+入门教程+示例

    3. **工作流程**:生产者将消息发送到指定的队列,消费者从队列中读取消息并执行相应的业务逻辑。 #### 五、案例分析:Work 模式下的商品数据同步 **Work 模式**:在这种模式下,一个生产者可以将消息发送到队列,...

    20.消息中间件之RabbitMQ入门讲解

    消息中间件之RabbitMQ入门讲解”的主题中,我们将深入理解RabbitMQ的核心概念,如何通过控制台进行管理,以及如何在Spring Cloud框架下创建消息生产者和消费者。 首先,让我们了解RabbitMQ的基本概念。RabbitMQ的...

    rabbitmq-demo.zip

    【标题】:RabbitMQ入门演示项目 【描述】中的知识点: 1. **RabbitMQ**:RabbitMQ是一个开源的消息代理和队列服务器,它使用AMQP(Advanced Message Queuing Protocol)协议,用于在分布式系统中可靠地传递消息。...

    RabbitMQ入门教程.docx

    ### RabbitMQ入门知识点详解 #### 一、RabbitMQ简介 RabbitMQ是一款开源的消息中间件,基于Erlang语言开发而成。它支持多种消息发布/订阅模式,并且能够跨多平台运行。作为消息中间件,RabbitMQ的核心功能是接受、...

    RabbitMQ 入门教程(JAVA)

    ### RabbitMQ 入门教程(JAVA) #### 一、RabbitMQ 概述 RabbitMQ 是一个消息中间件,其主要功能是接收来自生产者的消息,并根据规则将其路由、缓冲以及持久化后传递给消费者。RabbitMQ 和消息传递系统通常会使用...

    RabbitMQ.pdf-详情

    RabbitMQ是一个开源的消息代理和队列服务器,用于在分布式系统中处理和路由消息。它支持多种消息协议,但最常用的是AMQP(Advanced Message Queuing Protocol)。 首先,我们从快速入门开始,创建一个包含生产者和...

    rabbitMQ 消息队列 Demo

    这是RabbitMQ入门的经典示例,它展示了最基础的消息发布与消费过程。生产者发送一个简单的"Hello, World!"消息到RabbitMQ服务器,然后消费者从队列中取出并打印这个消息。这个例子帮助我们理解RabbitMQ的基本工作...

    RabbitMQ入门操作手册.pdf

    【RabbitMQ入门操作手册】提供了全面的RabbitMQ学习指南,从基础概念到实际操作,帮助初学者快速掌握这个强大的消息队列系统。RabbitMQ是一个基于AMQP(Advanced Message Queuing Protocol)的开源消息代理,其核心...

Global site tag (gtag.js) - Google Analytics