`

RabbitMQ (三) 发布/订阅

阅读更多

转发请标明出处:http://blog.csdn.net/lmj623565791/article/details/37657225

本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考。 

上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解:RabbitMQ (二)工作队列。这篇博客中,我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。

         为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志。

         在我们的日志系统中,每一个运行的接收者程序都会收到日志。然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。

         本质上来说,就是发布的日志消息会转发给所有的接收者。

1、转发器(Exchanges)

前面的博客中我们主要的介绍都是发送者发送消息给队列,接收者从队列接收消息。下面我们会引入Exchanges,展示RabbitMQ的完整的消息模型。

RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。

 

下面列出一些可用的转发器类型:

Direct

Topic

Headers

Fanout

目前我们关注最后一个fanout,声明转发器类型的代码:

channel.exchangeDeclare("logs","fanout");

fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的。

2、匿名转发器(nameless exchange)

前面说到生产者只能发送消息给转发器(Exchange),但是我们前两篇博客中的例子并没有使用到转发器,我们仍然可以发送和接收消息。这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。

现在我们可以指定消息发送到的转发器:

channel.basicPublish( "logs","", null, message.getBytes());

3、临时队列(Temporary queues)

前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。
不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事:
第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。
String queueName = channel.queueDeclare().getQueue();
一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。

4、绑定(Bindings)

 

我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。
channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称

5、完整的例子
日志发送端:
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. package com.zhy.rabbit._03_bindings_exchanges;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.Date;  
  5.   
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9.   
  10. public class EmitLog  
  11. {  
  12.     private final static String EXCHANGE_NAME = "ex_log";  
  13.   
  14.     public static void main(String[] args) throws IOException  
  15.     {  
  16.         // 创建连接和频道  
  17.         ConnectionFactory factory = new ConnectionFactory();  
  18.         factory.setHost("localhost");  
  19.         Connection connection = factory.newConnection();  
  20.         Channel channel = connection.createChannel();  
  21.         // 声明转发器和类型  
  22.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );  
  23.           
  24.         String message = new Date().toLocaleString()+" : log something";  
  25.         // 往转发器上发送消息  
  26.         channel.basicPublish(EXCHANGE_NAME, ""null, message.getBytes());  
  27.   
  28.         System.out.println(" [x] Sent '" + message + "'");  
  29.   
  30.         channel.close();  
  31.         connection.close();  
  32.   
  33.     }  
  34.   
  35. }  

没什么太大的改变,声明队列的代码,改为声明转发器了,同样的消息的传递也交给了转发器。
接收端1 :ReceiveLogsToSave.java:
[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. package com.zhy.rabbit._03_bindings_exchanges;  
  2.   
  3. import java.io.File;  
  4. import java.io.FileNotFoundException;  
  5. import java.io.FileOutputStream;  
  6. import java.io.IOException;  
  7. import java.text.SimpleDateFormat;  
  8. import java.util.Date;  
  9.   
  10. import com.rabbitmq.client.Channel;  
  11. import com.rabbitmq.client.Connection;  
  12. import com.rabbitmq.client.ConnectionFactory;  
  13. import com.rabbitmq.client.QueueingConsumer;  
  14.   
  15. public class ReceiveLogsToSave  
  16. {  
  17.     private final static String EXCHANGE_NAME = "ex_log";  
  18.   
  19.     public static void main(String[] argv) throws java.io.IOException,  
  20.             java.lang.InterruptedException  
  21.     {  
  22.         // 创建连接和频道  
  23.         ConnectionFactory factory = new ConnectionFactory();  
  24.         factory.setHost("localhost");  
  25.         Connection connection = factory.newConnection();  
  26.         Channel channel = connection.createChannel();  
  27.   
  28.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  29.         // 创建一个非持久的、唯一的且自动删除的队列  
  30.         String queueName = channel.queueDeclare().getQueue();  
  31.         // 为转发器指定队列,设置binding  
  32.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
  33.   
  34.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  35.   
  36.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  37.         // 指定接收者,第二个参数为自动应答,无需手动应答  
  38.         channel.basicConsume(queueName, true, consumer);  
  39.   
  40.         while (true)  
  41.         {  
  42.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  43.             String message = new String(delivery.getBody());  
  44.   
  45.             print2File(message);  
  46.         }  
  47.   
  48.     }  
  49.   
  50.     private static void print2File(String msg)  
  51.     {  
  52.         try  
  53.         {  
  54.             String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();  
  55.             String logFileName = new SimpleDateFormat("yyyy-MM-dd")  
  56.                     .format(new Date());  
  57.             File file = new File(dir, logFileName+".txt");  
  58.             FileOutputStream fos = new FileOutputStream(file, true);  
  59.             fos.write((msg + "\r\n").getBytes());  
  60.             fos.flush();  
  61.             fos.close();  
  62.         } catch (FileNotFoundException e)  
  63.         {  
  64.             e.printStackTrace();  
  65.         } catch (IOException e)  
  66.         {  
  67.             e.printStackTrace();  
  68.         }  
  69.     }  
  70. }  


随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后写入日志文件。

 

接收端2:ReceiveLogsToConsole.java

 

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. package com.zhy.rabbit._03_bindings_exchanges;  
  2.   
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.Connection;  
  5. import com.rabbitmq.client.ConnectionFactory;  
  6. import com.rabbitmq.client.QueueingConsumer;  
  7.   
  8. public class ReceiveLogsToConsole  
  9. {  
  10.     private final static String EXCHANGE_NAME = "ex_log";  
  11.   
  12.     public static void main(String[] argv) throws java.io.IOException,  
  13.             java.lang.InterruptedException  
  14.     {  
  15.         // 创建连接和频道  
  16.         ConnectionFactory factory = new ConnectionFactory();  
  17.         factory.setHost("localhost");  
  18.         Connection connection = factory.newConnection();  
  19.         Channel channel = connection.createChannel();  
  20.   
  21.         channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  
  22.         // 创建一个非持久的、唯一的且自动删除的队列  
  23.         String queueName = channel.queueDeclare().getQueue();  
  24.         // 为转发器指定队列,设置binding  
  25.         channel.queueBind(queueName, EXCHANGE_NAME, "");  
  26.   
  27.         System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  28.   
  29.         QueueingConsumer consumer = new QueueingConsumer(channel);  
  30.         // 指定接收者,第二个参数为自动应答,无需手动应答  
  31.         channel.basicConsume(queueName, true, consumer);  
  32.   
  33.         while (true)  
  34.         {  
  35.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  36.             String message = new String(delivery.getBody());  
  37.             System.out.println(" [x] Received '" + message + "'");  
  38.   
  39.         }  
  40.   
  41.     }  
  42.   
  43. }  

随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后打印到控制台。

 

现在把两个接收端运行,然后运行3次发送端:

输出结果:

发送端:

 [x] Sent '2014-7-10 16:04:54 : log something'

 [x] Sent '2014-7-10 16:04:58 : log something'

 [x] Sent '2014-7-10 16:05:02 : log something'

接收端1:

接收端2:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '2014-7-10 16:04:54 : log something'
 [x] Received '2014-7-10 16:04:58 : log something'
 [x] Received '2014-7-10 16:05:02 : log something'

 

这个例子实现了我们文章开头所描述的日志系统,利用了转发器的类型:fanout。

本篇说明了,生产者将消息发送至转发器,转发器决定将消息发送至哪些队列,消费者绑定队列获取消息。

分享到:
评论

相关推荐

    rabbitmq 发布/订阅 java 实现

    在Java开发中,RabbitMQ提供了一套完整的API,使得开发者能够轻松地实现发布/订阅模式。这种模式下,生产者发送消息到一个主题,而多个消费者可以订阅这个主题,接收并处理这些消息。 首先,要使用RabbitMQ,你需要...

    RabbitMQ:安装、配置与使用初探

    可以根据实际需求进一步探索 RabbitMQ 的高级功能,如发布/订阅模式、持久化消息等。更多关于配置的信息可以参考官方文档:[http://www.rabbitmq.com/configure.html#config-items]...

    rabbitMQ发布和订阅共7页.pdf.zip

    **RabbitMQ发布与订阅模式详解** RabbitMQ是一款开源的消息队列系统,基于AMQP(Advanced Message Queuing Protocol)协议实现,广泛应用于分布式系统中的异步任务处理、解耦和扩展性提升。在这个名为“rabbitMQ...

    RabbitMQ封装为c++版本,并且使用方式为发布订阅模式

    在这个项目中,我们关注的是RabbitMQ的C++客户端库的封装以及如何实现发布订阅模式。 首先,我们要了解RabbitMQ的基本概念。RabbitMQ是一个消息代理,它接收和转发消息,充当生产者和消费者之间的桥梁。生产者是...

    rabbitmq发布订阅

    【标题】:“rabbitmq发布订阅”是分布式消息传递中的一个重要概念,它允许生产者发送消息到RabbitMQ服务器,而多个消费者可以订阅这些消息并进行处理。RabbitMQ是一个开源的消息代理和队列服务器,使用AMQP...

    RabbitMQ 发布者订阅者

    本文将深入探讨基于RabbitMQ实现的发布者订阅者(Publisher-Subscriber)模型,以及如何通过它来优化系统架构。 发布者订阅者模型是消息队列中的一种通信模式,其中发布者发送消息到一个主题,而多个订阅者可以监听...

    C# .net 6 rabbitMq发布订阅类封装

    RabbitListener是mq消息的监听,BasicPublish丢消息的方法

    RabbitMQDemo_RabbitMQ发布与订阅Demo_rabbitmq_JSON_中间件_

    总结来说,"RabbitMQDemo_RabbitMQ发布与订阅Demo_rabbitmq_JSON_中间件_"是一个演示项目,其核心在于利用RabbitMQ作为中间件,通过发布者发送JSON格式的消息到队列,由订阅者从队列中接收并处理这些消息。...

    RabbitMQ-Server安装包

    它实现了AMQP(Advanced Message Queuing Protocol)协议,并提供了可靠的消息传输、灵活的消息路由、消息队列、发布/订阅等功能。 通过下载并安装rabbitmq-server-3.7.28,你可以部署并运行这个具体版本的RabbitMQ...

    C# RabbitMQ 主题订阅的源码

    主题订阅模式是RabbitMQ提供的一种发布/订阅模型,其中消息基于特定的路由键(topic)进行发送和接收。路由键类似于正则表达式,允许接收者根据需要过滤消息。 ### 4. 创建RabbitMQ连接 在C#中,我们首先需要创建一...

    RabbitMQ实战指南-rabbitmq-action.zip

    **三、RabbitMQ核心概念** 1. **生产者**: 生产者是发送消息到RabbitMQ的应用,它创建并发布消息到交换机。 2. **交换机**: 交换机根据预定义的路由规则决定将消息路由到哪个队列。常见的交换机类型有Direct、...

    RabbitMQ-环境安装包

    2. **RabbitMQ Server**: RabbitMQ 本身是消息队列服务,允许应用程序通过发布和订阅模式进行异步通信,从而提高系统的可扩展性和灵活性。 **Erlang 安装步骤**: 1. **更新系统**: 在安装任何新软件之前,确保你的...

    WCF/RabbitMQ绑定/负载均衡

    这意味着WCF服务可以通过RabbitMQ发布消息,而WCF客户端可以订阅这些消息,从而实现服务之间的解耦和异步通信。 **负载均衡** 负载均衡是优化系统性能和容错能力的重要手段。在WCF中,可以通过配置多个服务实例并...

    RabbitMQ linux 安装包和安装环境

    你可以开始创建队列、交换机和绑定,实现消息的发布和订阅。记住,RabbitMQ的强大之处在于其灵活性和丰富的插件系统,可以根据具体需求进行定制和扩展。通过持续学习和实践,你可以充分利用RabbitMQ来优化和提升你的...

    kettle rabbitmq 插件开发

    描述没有提供具体细节,但我们可以假设内容可能涵盖如何结合 Kettle 和 RabbitMQ 实现数据流的发布和订阅。这通常涉及以下几个关键知识点: 1. **RabbitMQ 简介**:首先,我们需要了解 RabbitMQ 的基本概念,包括它...

    RabbitMQ技术帮助文档

    RabbitMQ 是一种基于 AMQP(高级消息队列协议)的消息中间件,它提供了一个健壮的消息传递系统,用于在分布式系统中处理消息的发布、订阅和路由。RabbitMQ 支持多种消息传递模式,并且可以与各种编程语言集成。 ###...

    rabbitmq 操作手册

    由于其高度可扩展性和可靠性,RabbitMQ被广泛应用于各种场景中,如异步处理、任务队列、发布订阅模型等。 #### 二、RabbitMQ安装技巧 1. **Erlang环境安装**: - RabbitMQ基于Erlang语言开发,因此首先需要安装...

    RabbitMQ中文文档.pdf

    * 发布/订阅:RabbitMQ可以用来实现发布/订阅模式,用于实时推送消息。 * 远程过程调用:RabbitMQ可以用来实现远程过程调用,用于分布式系统的通信。 RabbitMQ的优点包括: * 高可靠性:RabbitMQ提供了多种机制来...

    RabbitMQClient_订阅测试.zip_rabbitmq

    在实际使用中,RabbitMQ的订阅模式是基于发布/订阅(Publish/Subscribe)模型,其中生产者发送消息到一个交换机,然后由交换机根据预定义的路由规则将消息分发到一个或多个队列。消费者订阅这些队列,当有新消息到达...

Global site tag (gtag.js) - Google Analytics