`

Java学习——rabbitmq(topic)

 
阅读更多

Topic

/* Routing Model
                   /(*.friends) [...](Q1) -> consumers one
P -> X(type topic)
                   \(*.enemies|female.#) [...](Q2) -> consumers two
*/

前面的Routing虽然可以把消息分组路由给不同的消费群体(consumers),但是消费者只能监听到这个生产者对应key的消息,如何让它同时再监听别的生产者的消息,我们这里试用一下Topic模型。

 

Topic Exchange 和 Direct Exchange很类似,不过它的key支持简单的正则匹配

 

* (star) can substitute for exactly one word

# (hash) can substiture for zero or more words

 

C1对所有(男|女)朋友的话都感兴趣,C2对(男|女)敌人的话和女(朋友|敌人)的话感兴趣,这样女性朋友的话会被同时分发给C1和C2,这是在Routing模型下不能实现的。

topic类型比较灵活,当一个队列bind一个"#"的key时,它接收所有消息,此时exchange类型就像fanout一样,同时,如果我们没有使用"*"或者"#",则exchange类型类似direct类型。

 

Publisher: Send.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

	public static void main(String[] args) throws IOException {
		// 创建一个连接连接服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","topic");
		
		String message = "hi all friends";
		channel.basicPublish("chatroom", "*.friends", null, message.getBytes());
		System.out.println(" [x] Sent '" + "*.friends" + "':'" + message +"'");
		
		message = "bye all enemies";
		channel.basicPublish("chatroom", "*.enemies", null, message.getBytes());
		System.out.println(" [x] Sent '" + "*.enemies" + "':'" + message +"'");
		
		message = "delete all females friends or enemies";
		channel.basicPublish("chatroom", "female.#", null, message.getBytes());
		System.out.println(" [x] Sent '" + "female.#" + "':'" + message +"'");
		
		channel.close();
		connection.close();
	}
}

Consumer Recv.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;

public class Recv {

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 创建一个连接接收数据
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","topic");
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, "chatroom", "*.enemies");
		channel.queueBind(queueName, "chatroom", "female.*");
		// 等待消息
		System.out.println("waiting for messages from enemies and females");
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
		}
	}
}

 运行效果如下:

 

//Recv 1.java
channel.queueBind(queueName, "chatroom", "*.friends");
System.out.println("waiting for messages from friends");
/* output
waiting for messages from all
 [x] Received 'hi all friends'
*/
//Recv 2.java
channel.queueBind(queueName, "chatroom", "*.enemies");
channel.queueBind(queueName, "chatroom", "female.*");
System.out.println("waiting for messages from enemies and females");
/* output
waiting for messages from enemies and females
 [x] Received 'bye all enemies'
 [x] Received 'delete all females friends or enemies'
*/

使用topic和正则可以减少很多队列key,比起routing要灵活很多

分享到:
评论

相关推荐

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    RabbitMQ实战:高效部署分布式消息队列

    总之,《RabbitMQ实战:高效部署分布式消息队列》全面覆盖了从基础理论到实战技巧的各个层面,是学习和精通RabbitMQ的宝贵资源。通过学习,读者不仅可以掌握RabbitMQ的使用,还能理解其背后的设计理念,为构建高效、...

    rabbitmqdemo

    在 "rabbitmqdemo" 中的基础版示例中,你可以看到如何使用 RabbitMQ 的 Java 客户端库(rabbitmq-client)创建生产者和消费者。生产者会创建消息并发送到指定的交换器,消费者则订阅感兴趣的队列,等待接收消息。...

    rabbitmqDEMO.rar

    包含了两个主要的组件——"mblog-provider"和"mblog-consumer",分别代表了生产者和消费者的示例,它们将演示如何使用RabbitMQ进行消息的发送和接收,特别是利用主题交换机(Topic Exchange)这一功能。 首先,我们...

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    关于java程序员发展需要学习的路线整理集合 技术 应用技术 计算机基础知识 cpu mem disk net 线程,进程 第三方库 poi Jsoup zxing Gson 数据结构 树 栈 链表 队列 图 操作系统 linux 代码控制...

    RabbitMQ 学习整理

    5. **确认机制**:RabbitMQ 提供了两种确认机制——自动确认和手动确认。自动确认默认开启,消息一旦被消费者接收到就视为已处理;手动确认则需要消费者显式发送确认信号。 **三、RabbitMQ 高级特性** 1. **持久化...

    rabbitmq-server-2.8.5.tar.gz

    《RabbitMQ服务器详解——基于2.8.5版本》 RabbitMQ,作为一个开源的消息代理和队列服务器,是企业级消息中间件的首选,它遵循AMQP(Advanced Message Queuing Protocol)协议,提供了高可用性、可扩展性和可靠性。...

    rabbitMq与spring、springmvc结合的测试工程

    RabbitMQ作为一款广泛使用的开源消息代理,经常被集成到基于Java的Spring框架及其子框架Spring MVC中,以实现异步处理和分布式系统通信。下面将详细介绍RabbitMQ与Spring、Spring MVC结合的关键知识点。 1. **...

    rabbitMQ安装包和demo.zip

    首先,我们要理解RabbitMQ的核心概念——消息队列。消息队列在分布式系统中起着至关重要的作用,它允许不同的组件以异步的方式通信。当一个组件(生产者)生成消息时,它并不直接发送给另一个组件(消费者),而是将...

    RabbitMq消息队列指南.docx

    1. **Broker**:这是RabbitMQ服务进程,它包含两个关键部分——Exchange和Queue。 2. **Exchange**:消息交换机,根据预设的路由规则将消息路由到相应的队列,起到消息过滤和分配的作用。 3. **Queue**:消息队列,...

    rabbitmq-server-3.8.8+erlang-21.3-1.el7.x86-64-linux版.zip

    接下来是RabbitMQ的核心部分——"rabbitmq-server-3.8.8-1.el7.noarch.rpm"。这是RabbitMQ服务器的3.8.8版本,同样适用于EL7系列的Linux系统。这个版本的RabbitMQ引入了多项改进和修复,确保稳定性和性能。安装此包...

    rabbitmq.rar

    1. **连接与认证**:如何使用RabbitMQ的客户端库(如Python的pika或Java的rabbitmq-client)建立连接,并进行身份验证。 2. **创建Exchange**:如何声明交换机,并指定其类型。 3. **声明Queue**:定义队列,可能...

    (spring cloud stream 整合 rabbitmq , 自定义消息通道,既能发消息,)cloud-stream-rabbitmq-test.rar

    在本项目"cloud-stream-rabbitmq-test"中,我们将探讨如何将Spring Cloud Stream与RabbitMQ整合,创建自定义的消息通道,以实现双向通信——既能发送消息,也能接收消息。 1. **Spring Cloud Stream基本概念**: -...

    四十万字数总结三大主流MQ的底层实现原理以及实战问题解决方案

    本篇文章将聚焦于三大主流MQ——RabbitMQ、Kafka和RocketMQ的底层实现原理以及实战中的问题解决方案,帮助读者深入理解这些技术并提升解决实际问题的能力。 首先,我们来看看RabbitMQ,它是基于AMQP(Advanced ...

    RabbitMQ与SpringBoot整合.docx

    针对RabbitMQ而言,其架构还包括一个额外的重要组成部分——交换机(Exchange)。通过引入交换机,RabbitMQ实现了生产者与消息队列之间的解耦。生产者将消息发送至交换机,而交换机会根据预设的路由策略将消息转发到...

    rabbitmq:rabbitmq演示

    4. **工作模式**:RabbitMQ支持两种主要的工作模式——同步(RPC)和异步。同步模式常用于请求-响应场景,而异步模式适用于解耦和负载平衡。 **Spring整合RabbitMQ** 1. **Spring AMQP**:Spring提供了Spring AMQP...

    java消息服务(第二版)

    在《Java消息服务(第二版)》这本书中,读者可以深入学习到JMS的核心概念、设计模式以及实际应用。以下是基于JMS的详细知识点: 1. **JMS简介**:JMS是Java平台上的API,用于在分布式系统中创建、发送、接收和读取...

    java-jms小例子

    3. **消息队列与主题**:JMS提供两种类型的消息传输模式——点对点(Queue)和发布/订阅(Topic)。消息队列遵循点对点模型,每个消息仅由一个消费者接收,适合一对一的通信。消息主题遵循发布/订阅模型,多个消费者...

    jms.rar_jar j_java jms_jms_jms jar_jms.j

    描述中的"java消息系统 JMS 学习代码 例子 jar"表明这个压缩包是为了学习JMS而准备的,里面包含了示例代码,便于开发者理解和实践JMS的工作原理。`jar`文件通常用于打包和分发Java类库,这里可能是为了方便导入和...

Global site tag (gtag.js) - Google Analytics