`
king_tt
  • 浏览: 2224967 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

RabbitMQ (五)主题(Topic)

 
阅读更多

转载请标明出处:http://blog.csdn.net/lmj623565791/article/details/37706355

上一篇博客中,我们进步改良了我们的日志系统。我们使用direct类型转发器,使得接收者有能力进行选择性的接收日志,,而非fanout那样,只能够无脑的转发,如果你还不了解:RabbitMQ (四) 路由选择 (Routing)

虽然使用direct类型改良了我们的系统,但是仍然存在一些局限性:它不能够基于多重条件进行路由选择。
在我们的日志系统中,我们有可能希望不仅根据日志的级别而且想根据日志的来源进行订阅。这个概念类似unix工具:syslog,它转发日志基于严重性(info/warning/crit…)和设备(auth/cron/kern…)
这样可能给我们更多的灵活性:我们可能只想订阅来自’cron’的致命错误日志,而不是来自’kern’的。
为了在我们的系统中实现上述的需求,我们需要学习稍微复杂的主题类型的转发器(topic exchange)。

1、 主题转发(Topic Exchange)
发往主题类型的转发器的消息不能随意的设置选择键(routing_key),必须是由点隔开的一系列的标识符组成。标识符可以是任何东西,但是一般都与消息的某些特性相关。一些合法的选择键的例子:"stock.usd.nyse", "nyse.vmw","quick.orange.rabbit".你可以定义任何数量的标识符,上限为255个字节。
绑定键和选择键的形式一样。主题类型的转发器背后的逻辑和直接类型的转发器很类似:一个附带特殊的选择键将会被转发到绑定键与之匹配的队列中。需要注意的是:关于绑定键有两种特殊的情况。
*可以匹配一个标识符。
#可以匹配0个或多个标识符。
2、 图解:
我们准备发送关于动物的消息。消息会附加一个选择键包含3个标识符(两个点隔开)。第一个标识符描述动物的速度,第二个标识符描述动物的颜色,第三个标识符描述动物的物种:<speed>.<color>.<species>。
我们创建3个绑定键:Q1与*.orange.*绑定Q2与*.*.rabbit和lazy.#绑定。
可以简单的认为:
Q1对所有的橙色动物感兴趣。
Q2想要知道关于兔子的一切以及关于懒洋洋的动物的一切。
一个附带quick.orange.rabbit的选择键的消息将会被转发到两个队列。附带lazy.orange.elephant的消息也会被转发到两个队列。另一方面quick.orange.fox只会被转发到Q1,lazy.brown.fox将会被转发到Q2。lazy.pink.rabbit虽然与两个绑定键匹配,但是也只会被转发到Q2一次。quick.brown.fox不能与任何绑定键匹配,所以会被丢弃。
如果我们违法我们的约定,发送一个或者四个标识符的选择键,类似:orange,quick.orange.male.rabbit,这些选择键不能与任何绑定键匹配,所以消息将会被丢弃。
另一方面,lazy.orange.male.rabbit,虽然是四个标识符,也可以与lazy.#匹配,从而转发至Q2。
注:主题类型的转发器非常强大,可以实现其他类型的转发器。
当一个队列与绑定键#绑定,将会收到所有的消息,类似fanout类型转发器。
当绑定键中不包含任何#与*时,类似direct类型转发器。
3、 完整的例子

发送端EmitLogTopic.java:

package com.zhy.rabbit._05_topic_exchange;

import java.util.UUID;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLogTopic
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "topic");

		String[] routing_keys = new String[] { "kernal.info", "cron.warning",
				"auth.info", "kernel.critical" };
		for (String routing_key : routing_keys)
		{
			String msg = UUID.randomUUID().toString();
			channel.basicPublish(EXCHANGE_NAME, routing_key, null, msg
					.getBytes());
			System.out.println(" [x] Sent routingKey = "+routing_key+" ,msg = " + msg + ".");
		}

		channel.close();
		connection.close();
	}
}

我们发送了4条消息,分别设置了不同的选择键。

接收端1,ReceiveLogsTopicForKernel.java

package com.zhy.rabbit._05_topic_exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopicForKernel
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明转发器
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 随机生成一个队列
		String queueName = channel.queueDeclare().getQueue();
		
		//接收所有与kernel相关的消息
		channel.queueBind(queueName, EXCHANGE_NAME, "kernel.*");

		System.out.println(" [*] Waiting for messages about kernel. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			String routingKey = delivery.getEnvelope().getRoutingKey();

			System.out.println(" [x] Received routingKey = " + routingKey
					+ ",msg = " + message + ".");
		}
	}
}

直接收和Kernel相关的日志消息。

接收端2,ReceiveLogsTopicForCritical.java

package com.zhy.rabbit._05_topic_exchange;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class ReceiveLogsTopicForCritical
{

	private static final String EXCHANGE_NAME = "topic_logs";

	public static void main(String[] argv) throws Exception
	{
		// 创建连接和频道
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		// 声明转发器
		channel.exchangeDeclare(EXCHANGE_NAME, "topic");
		// 随机生成一个队列
		String queueName = channel.queueDeclare().getQueue();

		// 接收所有与kernel相关的消息
		channel.queueBind(queueName, EXCHANGE_NAME, "*.critical");

		System.out
				.println(" [*] Waiting for critical messages. To exit press CTRL+C");

		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true)
		{
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			String routingKey = delivery.getEnvelope().getRoutingKey();

			System.out.println(" [x] Received routingKey = " + routingKey
					+ ",msg = " + message + ".");
		}
	}
}

只接收致命错误的日志消息。

运行结果:

[x] Sent routingKey = kernal.info ,msg = a7261f0d-18cc-4c85-ba80-5ecd9283dae7.
[x] Sent routingKey = cron.warning ,msg = 0c7e4484-66e0-4846-a869-a7a266e16281.
[x] Sent routingKey = auth.info ,msg = 3273f21f-6e6e-42f2-83df-1f2fafa7a19a.
[x] Sent routingKey = kernel.critical ,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

[*] Waiting for messages about kernel. To exit press CTRL+C
[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

--------------------------------------------------------------------------------------------------------------------

[*] Waiting for critical messages. To exit press CTRL+C
[x] Received routingKey = kernel.critical,msg = f65d3e1a-0619-4f85-8b0d-59375380ecc9.

可以看到,我们通过使用topic类型的转发器,成功实现了多重条件选择的订阅。


分享到:
评论

相关推荐

    C# RabbitMQ 主题订阅的源码

    RabbitMQ主题订阅 主题订阅模式是RabbitMQ提供的一种发布/订阅模型,其中消息基于特定的路由键(topic)进行发送和接收。路由键类似于正则表达式,允许接收者根据需要过滤消息。 ### 4. 创建RabbitMQ连接 在C#中,...

    rabbitmq-topic demo

    "rabbitmq-topic demo"是一个关于RabbitMQ主题模式的示例,下面将详细介绍RabbitMQ的主题模式及其相关知识点。 **1. RabbitMQ基本概念** - **Broker**: RabbitMQ服务器,负责接收、存储和转发消息。 - **Exchange*...

    rabbitmq 主题java 实现

    下面将详细介绍RabbitMQ主题模式在Java中的实现。 ### 1. 安装与配置RabbitMQ 首先,你需要在本地或服务器上安装RabbitMQ。你可以从官方网站下载并按照指南进行安装。安装完成后,启动RabbitMQ服务器。 ### 2. ...

    spring boot使用RabbitMQ实现topic 主题

    Spring Boot 使用 RabbitMQ 实现 Topic 主题 Topic 主题是一种高级的消息 routing 机制,它允许消息的消费者根据路由键(Routing Key)来订阅消息。在 Spring Boot 中,我们可以使用 RabbitMQ 来实现 Topic 主题。...

    rabbitmq代码,包含了消息队列的5中方式,topic 等模式,还有保持消息持久化的解决方法(交换机 队列 消息同时持久化)等。

    3. **Topic模式**:主题模式允许更灵活的路由策略。路由键可以包含点号分隔的单词,消费者可以设置带有通配符的绑定规则,例如“*.stock.*”可以匹配“us.stock.price”和“uk.stock.news”。 4. **Header模式**:...

    RabbitMQ主题共5页.pdf.zip

    【标题】:“RabbitMQ主题共5页.pdf.zip”是一个压缩文件,包含了关于RabbitMQ技术的五页详细讲解。这个文件很可能包含了RabbitMQ的基础概念、核心特性、使用场景以及配置方法等内容。 【描述】:“RabbitMQ主题共5...

    rabbitmq基础+springboot集成rabbitmq

    RabbitMQ预定义了几种交换器类型,如Direct(直接)、Fanout(广播)、Topic(主题)和Header(头)。 4. **队列**:队列是消息的容器,存储待处理的消息。消息被路由到队列后,等待消费者来处理。 **工作队列** ...

    rabbitmq_priority_topic_python:rabbitmq 带有优先级和主题功能,可以发送带优先级的物品,带主题的物品接收

    rabbitmq 带有优先级和主题功能,可以发送带优先级的物品,带主题的物品接收 确保插件 rabbitmq_priority_queue 已安装并启用。 安装库:python-pika 看这里的情况:你想用queue来保存不同优先级的items,比如0-10...

    RabbitMQ的五种模式源码+纯手工打的代码

    在这个模式下,生产者发布消息到一个主题(Topic),多个消费者订阅该主题,接收所有消息。通过使用通配符(如“*”和“#”)来定义订阅规则,消费者可以选择性地接收感兴趣的消息。 4. **路由(Direct Routing)*...

    RabbitMQ实战带目录版本

    ### 五、RabbitMQ实战应用 1. **Spring整合**:在Java应用中,Spring框架提供了与RabbitMQ集成的库,方便地创建生产者和消费者。 2. **消息确认**:消费者可以通过确认机制通知RabbitMQ是否成功处理了消息,确保...

    RabbitMQ封装为c++版本,并且使用方式为发布订阅模式

    发布订阅模式是RabbitMQ提供的一种通信模式,其中生产者发布消息到一个主题(topic),而多个消费者可以订阅这个主题,从而接收到发布的消息。 在C++中封装RabbitMQ,我们需要使用RabbitMQ的C++客户端库,通常是`...

    rabbitmq完整例子

    - 路由策略:RabbitMQ提供了多种路由策略,如Direct、Fanout、Topic和Header,以满足不同场景的需求。 5. **案例详解** - Direct交换机示例:消息基于精确匹配的路由键直接发送到队列。 - Fanout交换机示例:...

    rabbitMQ实战java版-rabbitMQ-demo.zip

    3. **Topic Exchange**: 主题交换,根据路由键的模式匹配,支持模糊匹配。 4. **Header Exchange**: 头部交换,根据消息头部的特定字段进行匹配,较少使用。 三、Java集成RabbitMQ 在Java项目中,我们可以使用`...

    rabbitmqDEMO.rar

    包含了两个主要的组件——"mblog-provider"和"mblog-consumer",分别代表了生产者和消费者的示例,它们将演示如何使用RabbitMQ进行消息的发送和接收,特别是利用主题交换机(Topic Exchange)这一功能。 首先,我们...

    Delphi-RabbitMQ.zip

    你可以根据业务需求选择不同的交换机类型,如直连(Direct)、主题(Topic)、扇出(Fanout)等。 ```delphi Channel.ExchangeDeclare('my_exchange', 'direct', false, false, false, '', ''); ``` 然后,声明一...

    java rabbitmq动态注册,监听实现

    需要理解RabbitMQ的各种工作模式,如Direct、Fanout、Topic和Header,以选择适合业务需求的模式。例如,Direct模式适用于一对一消息传递,Topic模式则支持多对多的路由策略。 7. **异常处理**: 当消费者处理消息...

    RabbitMQ Server3.13.0

    2. **Publish/Subscribe**: 通过主题交换机实现广播模式,所有订阅特定主题的消费者都会收到消息。 3. **Dead Letter Exchanges**: 当消息无法路由或者队列满时,可以配置死信交换机来处理这些消息。 4. **TTL与...

    rabbitmq代理配置和编码1

    `amq.topic`是一个主题交换机,允许模糊匹配(通配符路由)。这样,订阅者就能接收符合`&lt;topicName&gt;`模式的消息。 对于`SEND`帧,消息会被发送到`amq.topic`交换机,同样使用`&lt;topicName&gt;`作为`routingKey`。这...

    C#的Demo项目:RabbitMQ封装和使用

    3. **Topic Exchange**(主题交换机):结合了Direct和Fanout的特点,它根据路由键的模式进行匹配,将消息分发给符合条件的队列。路由键可以包含多个单词,每个单词之间用点号或星号分隔。点号表示精确匹配,星号...

    rabbitmq-server-3.7.8

    7. **工作模式**:RabbitMQ支持多种工作模式,如Direct模式(直接匹配),Fanout模式(广播模式),Topic模式(主题模式)和Header模式(基于消息头部信息匹配)。 8. **Durability**:为了确保消息在服务器重启或...

Global site tag (gtag.js) - Google Analytics