`

RabbitMQ (三)发布/订阅-

 
阅读更多

转载:http://blog.csdn.net/lmj623565791/article/details/37657225

本系列教程主要来自于官网入门教程的翻译,然后自己进行了部分的修改与实验,内容仅供参考。

上一篇博客中,我们实现了工作队列,并且我们的工作队列中的一个任务只会发给一个工作者,除非某个工作者未完成任务意外被杀死,会转发给另外的工作者,如果你还不了解:RabbitMQ (二)工作队列。这篇博客中,我们会做一些改变,就是把一个消息发给多个消费者,这种模式称之为发布/订阅(类似观察者模式)。

         为了验证这种模式,我们准备构建一个简单的日志系统。这个系统包含两类程序,一类程序发动日志,另一类程序接收和处理日志。

         在我们的日志系统中,每一个运行的接收者程序都会收到日志。然后我们实现,一个接收者将接收到的数据写到硬盘上,与此同时,另一个接收者把接收到的消息展现在屏幕上。

         本质上来说,就是发布的日志消息会转发给所有的接收者。

1、转发器(Exchanges)

前面的博客中我们主要的介绍都是发送者发送消息给队列,接收者从队列接收消息。下面我们会引入Exchanges,展示RabbitMQ的完整的消息模型。

RabbitMQ消息模型的核心理念是生产者永远不会直接发送任何消息给队列,一般的情况生产者甚至不知道消息应该发送到哪些队列。

相反的,生产者只能发送消息给转发器(Exchange)。转发器是非常简单的,一边接收从生产者发来的消息,另一边把消息推送到队列中。转发器必须清楚的知道消息如何处理它收到的每一条消息。是否应该追加到一个指定的队列?是否应该追加到多个队列?或者是否应该丢弃?这些规则通过转发器的类型进行定义。

 

下面列出一些可用的转发器类型:

Direct

Topic

Headers

Fanout

目前我们关注最后一个fanout,声明转发器类型的代码:

channel.exchangeDeclare("logs","fanout");

fanout类型转发器特别简单,把所有它介绍到的消息,广播到所有它所知道的队列。不过这正是我们前述的日志系统所需要的。

2、匿名转发器(nameless exchange)

前面说到生产者只能发送消息给转发器(Exchange),但是我们前两篇博客中的例子并没有使用到转发器,我们仍然可以发送和接收消息。这是因为我们使用了一个默认的转发器,它的标识符为””。之前发送消息的代码:

channel.basicPublish("", QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

第一个参数为转发器的名称,我们设置为”” : 如果存在routingKey(第二个参数),消息由routingKey决定发送到哪个队列。

现在我们可以指定消息发送到的转发器:

channel.basicPublish( "logs","", null, message.getBytes());

3、临时队列(Temporary queues)

前面的博客中我们都为队列指定了一个特定的名称。能够为队列命名对我们来说是很关键的,我们需要指定消费者为某个队列。当我们希望在生产者和消费者间共享队列时,为队列命名是很重要的。
不过,对于我们的日志系统我们并不关心队列的名称。我们想要接收到所有的消息,而且我们也只对当前正在传递的数据的感兴趣。为了满足我们的需求,需要做两件事:
第一, 无论什么时间连接到Rabbit我们都需要一个新的空的队列。为了实现,我们可以使用随机数创建队列,或者更好的,让服务器给我们提供一个随机的名称。
第二, 一旦消费者与Rabbit断开,消费者所接收的那个队列应该被自动删除。
Java中我们可以使用queueDeclare()方法,不传递任何参数,来创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。
String queueName = channel.queueDeclare().getQueue();
一般情况这个名称与amq.gen-JzTY20BRgKO-HjmUJj0wLg 类似。

4、绑定(Bindings)

 

我们已经创建了一个fanout转发器和队列,我们现在需要通过binding告诉转发器把消息发送给我们的队列。
channel.queueBind(queueName, “logs”, ””)参数1:队列名称 ;参数2:转发器名称

5、完整的例子
日志发送端:
package com.zhy.rabbit._03_bindings_exchanges;

import java.io.IOException;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog
{
	private final static String EXCHANGE_NAME = "ex_log";

	public static void main(String[] args) throws IOException
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明转发器和类型
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout" );
		
		String message = new Date().toLocaleString()+" : log something";
		// 往转发器上发送消息
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

		System.out.println(" [x] Sent '" + message + "'");

		channel.close();
		connection.close();

	}

}

没什么太大的改变,声明队列的代码,改为声明转发器了,同样的消息的传递也交给了转发器。
接收端1 :ReceiveLogsToSave.java:
package com.zhy.rabbit._03_bindings_exchanges;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToSave
{
	private final static String EXCHANGE_NAME = "ex_log";

	public static void main(String[] argv) throws java.io.IOException,
			java.lang.InterruptedException
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 创建一个非持久的、唯一的且自动删除的队列
		String queueName = channel.queueDeclare().getQueue();
		// 为转发器指定队列,设置binding
		channel.queueBind(queueName, EXCHANGE_NAME, "");

		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定接收者,第二个参数为自动应答,无需手动应答
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());

			print2File(message);
		}

	}

	private static void print2File(String msg)
	{
		try
		{
			String dir = ReceiveLogsToSave.class.getClassLoader().getResource("").getPath();
			String logFileName = new SimpleDateFormat("yyyy-MM-dd")
					.format(new Date());
			File file = new File(dir, logFileName+".txt");
			FileOutputStream fos = new FileOutputStream(file, true);
			fos.write((msg + "\r\n").getBytes());
			fos.flush();
			fos.close();
		} catch (FileNotFoundException e)
		{
			e.printStackTrace();
		} catch (IOException e)
		{
			e.printStackTrace();
		}
	}
}


随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后写入日志文件。

 

接收端2:ReceiveLogsToConsole.java

 

package com.zhy.rabbit._03_bindings_exchanges;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsToConsole
{
	private final static String EXCHANGE_NAME = "ex_log";

	public static void main(String[] argv) throws java.io.IOException,
			java.lang.InterruptedException
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		// 创建一个非持久的、唯一的且自动删除的队列
		String queueName = channel.queueDeclare().getQueue();
		// 为转发器指定队列,设置binding
		channel.queueBind(queueName, EXCHANGE_NAME, "");

		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		// 指定接收者,第二个参数为自动应答,无需手动应答
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");

		}

	}

}

随机创建一个队列,然后将队列与转发器绑定,然后将消费者与该队列绑定,然后打印到控制台。

 

现在把两个接收端运行,然后运行3次发送端:

输出结果:

发送端:

 [x] Sent '2014-7-10 16:04:54 : log something'

 [x] Sent '2014-7-10 16:04:58 : log something'

 [x] Sent '2014-7-10 16:05:02 : log something'

接收端1:

接收端2:

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received '2014-7-10 16:04:54 : log something'
 [x] Received '2014-7-10 16:04:58 : log something'
 [x] Received '2014-7-10 16:05:02 : log something'

 

这个例子实现了我们文章开头所描述的日志系统,利用了转发器的类型:fanout。

 

本篇说明了,生产者将消息发送至转发器,转发器决定将消息发送至哪些队列,消费者绑定队列获取消息。

分享到:
评论

相关推荐

    RabbitMQ:安装、配置与使用初探

    可以根据实际需求进一步探索 RabbitMQ 的高级功能,如发布/订阅模式、持久化消息等。更多关于配置的信息可以参考官方文档:[http://www.rabbitmq.com/configure.html#config-items]...

    rabbitmq-server-generic-unix-2.8.2.tar.gz

    通过创建生产者和消费者,你可以发送和接收消息,实现异步处理、任务队列、发布/订阅等多种应用场景。 总结来说,RabbitMQ是一个强大的消息中间件,提供可靠的消息传输和队列管理功能。2.8.2版本的RabbitMQ适用于...

    rabbitmq 发布/订阅 java 实现

    在Java开发中,RabbitMQ提供了一套完整的API,使得开发者能够轻松地实现发布/订阅模式。这种模式下,生产者发送消息到一个主题,而多个消费者可以订阅这个主题,接收并处理这些消息。 首先,要使用RabbitMQ,你需要...

    rabbitmq-server-windows-3.7.4

    - **工作模式**:RabbitMQ支持多种工作模式,如简单模式、发布/订阅模式、路由模式等,以适应不同的应用场景。 安全性: - **用户身份验证(Authentication)**:RabbitMQ支持多种身份验证机制,包括内置的用户...

    rabbitmq-c-master.rar_RabbitMQ c lib_cmake编译_rabbitmq_rabbitmq-c

    - 消息发布和消费:发布消息到队列,订阅队列以接收消息。 - AMQP命令:支持完整的AMQP命令集,包括事务、确认模式等。 在使用`rabbitmq-c`时,开发者需要理解AMQP协议的基础知识,包括交换机、队列、绑定和消息...

    rabbitmq-server-3.7.17-beta.1.exe.zip

    在实际应用中,开发者还需要理解如何通过编程语言API与RabbitMQ交互,例如使用Java的RabbitMQ客户端库、Python的pika库等,以及如何设计合理的消息模式,如发布/订阅、工作队列、请求/响应等,以满足业务需求。...

    rabbitmq-server-windows-3.4.1.zip

    RabbitMQ的核心功能包括消息发布/订阅、路由、工作队列等,它能确保消息的可靠传输,支持多种编程语言的客户端,并且具有高可用性、可扩展性和容错性。 在压缩文件“rabbitmq_server-3.4.1”中,通常包含以下组件和...

    RabbitMQ-环境安装包

    2. **RabbitMQ Server**: RabbitMQ 本身是消息队列服务,允许应用程序通过发布和订阅模式进行异步通信,从而提高系统的可扩展性和灵活性。 **Erlang 安装步骤**: 1. **更新系统**: 在安装任何新软件之前,确保你的...

    rabbitmq 操作手册

    由于其高度可扩展性和可靠性,RabbitMQ被广泛应用于各种场景中,如异步处理、任务队列、发布订阅模型等。 #### 二、RabbitMQ安装技巧 1. **Erlang环境安装**: - RabbitMQ基于Erlang语言开发,因此首先需要安装...

    rabbitmq-java-client-master.zip

    【标签】:“rabbitmq”表明这个压缩包主要与RabbitMQ相关,RabbitMQ是一个用Erlang语言开发的轻量级消息中间件,其核心功能包括发布/订阅、点对点消息模式、高可用性和多协议支持等。 【压缩包子文件的文件名称...

    rabbitmq-java-client-bin-3.0.4.zip

    在后台服务器端,开发者可以充分利用RabbitMQ提供的高级特性,如工作队列、发布/订阅模式、延迟队列等,以优化任务处理流程。同时,服务器端可以更自由地管理连接和资源,处理大量的并发请求。 总结起来,"rabbitmq...

    RabbitMQ实战指南-rabbitmq-action.zip

    **三、RabbitMQ核心概念** 1. **生产者**: 生产者是发送消息到RabbitMQ的应用,它创建并发布消息到交换机。 2. **交换机**: 交换机根据预定义的路由规则决定将消息路由到哪个队列。常见的交换机类型有Direct、...

    rabbitmq-server-windows-3.7.10.zip

    RabbitMQ支持多种工作模式,如简单模式、发布/订阅模式、路由模式、主题模式等,以满足不同场景的需求。同时,它还支持多种客户端库,如Java、Python、.NET等,方便开发者集成到他们的应用程序中。 此外,RabbitMQ...

    rabbitmq-java (2).zip

    在Java中,通过RabbitMQ的Java客户端库,我们可以创建连接、通道(Channel)、声明交换机和队列,然后发布和消费消息。例如,生产者会创建一个通道,声明一个交换机,并使用`channel.basicPublish()`方法发送消息;...

    rabbitmq-c-0.8.0-vs2010

    开发者或系统管理员可以利用这个库在C语言项目中集成RabbitMQ的功能,如发布/订阅、队列、路由等消息传递模式。 以下是压缩包内文件的功能概述: 1. **CMakeTestCInline.c**:这是一个CMake测试文件,用于验证...

    RabbitMQ安装文档

    - **灵活的消息路由**:支持发布订阅、工作队列、RPC 等模式。 - **高可用性**:支持集群、镜像队列等功能,确保系统的高可用性。 - **易于集成**:支持多种编程语言,方便不同应用之间的通信。 #### 三、Linux环境...

    rabbitmq-server-3.8.9.exe

    - **发布/订阅模型**:生产者发布消息到主题,所有订阅该主题的消费者都能收到消息。 - **直接交换**:消息根据精确匹配的路由键被发送到指定队列。 - **头部交换**:基于消息的headers属性进行路由。 - **扇出...

    rabbitmq-dotnet-client-3.6.14-dotnet-4.5

    - 发布/订阅:将消息发布到交换机,由交换机根据预设规则将消息路由到相应的队列。 - 消费者:接收和处理来自队列的消息,支持手动确认和自动确认模式。 - 事务和确认:确保消息的可靠传输,通过事务或确认机制保证...

    rabbitmq 实战练习-rabbitmq-actual.zip

    - **发布/订阅模式**:使用Fanout交换器实现一对多的消息广播。 - **工作队列**:使用Direct交换器,多个消费者从同一队列获取并处理任务,避免并发问题。 - **路由模式**:使用Topic交换器实现灵活的路由策略。 ...

    RabbitMQ技术帮助文档

    RabbitMQ 是一种基于 AMQP(高级消息队列协议)的消息中间件,它提供了一个健壮的消息传递系统,用于在分布式系统中处理消息的发布、订阅和路由。RabbitMQ 支持多种消息传递模式,并且可以与各种编程语言集成。 ###...

Global site tag (gtag.js) - Google Analytics