Topic
/* Routing Model /(*.friends) [...](Q1) -> consumers one P -> X(type topic) \(*.enemies|female.#) [...](Q2) -> consumers two */
前面的Routing虽然可以把消息分组路由给不同的消费群体(consumers),但是消费者只能监听到这个生产者对应key的消息,如何让它同时再监听别的生产者的消息,我们这里试用一下Topic模型。
Topic Exchange 和 Direct Exchange很类似,不过它的key支持简单的正则匹配
* (star) can substitute for exactly one word
# (hash) can substiture for zero or more words
C1对所有(男|女)朋友的话都感兴趣,C2对(男|女)敌人的话和女(朋友|敌人)的话感兴趣,这样女性朋友的话会被同时分发给C1和C2,这是在Routing模型下不能实现的。
topic类型比较灵活,当一个队列bind一个"#"的key时,它接收所有消息,此时exchange类型就像fanout一样,同时,如果我们没有使用"*"或者"#",则exchange类型类似direct类型。
Publisher: Send.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { public static void main(String[] args) throws IOException { // 创建一个连接连接服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("chatroom","topic"); String message = "hi all friends"; channel.basicPublish("chatroom", "*.friends", null, message.getBytes()); System.out.println(" [x] Sent '" + "*.friends" + "':'" + message +"'"); message = "bye all enemies"; channel.basicPublish("chatroom", "*.enemies", null, message.getBytes()); System.out.println(" [x] Sent '" + "*.enemies" + "':'" + message +"'"); message = "delete all females friends or enemies"; channel.basicPublish("chatroom", "female.#", null, message.getBytes()); System.out.println(" [x] Sent '" + "female.#" + "':'" + message +"'"); channel.close(); connection.close(); } }
Consumer Recv.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ShutdownSignalException; public class Recv { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // 创建一个连接接收数据 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //factory.setPort(1987); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("chatroom","topic"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "chatroom", "*.enemies"); channel.queueBind(queueName, "chatroom", "female.*"); // 等待消息 System.out.println("waiting for messages from enemies and females"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
运行效果如下:
//Recv 1.java channel.queueBind(queueName, "chatroom", "*.friends"); System.out.println("waiting for messages from friends"); /* output waiting for messages from all [x] Received 'hi all friends' */ //Recv 2.java channel.queueBind(queueName, "chatroom", "*.enemies"); channel.queueBind(queueName, "chatroom", "female.*"); System.out.println("waiting for messages from enemies and females"); /* output waiting for messages from enemies and females [x] Received 'bye all enemies' [x] Received 'delete all females friends or enemies' */
使用topic和正则可以减少很多队列key,比起routing要灵活很多
相关推荐
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
总之,《RabbitMQ实战:高效部署分布式消息队列》全面覆盖了从基础理论到实战技巧的各个层面,是学习和精通RabbitMQ的宝贵资源。通过学习,读者不仅可以掌握RabbitMQ的使用,还能理解其背后的设计理念,为构建高效、...
在 "rabbitmqdemo" 中的基础版示例中,你可以看到如何使用 RabbitMQ 的 Java 客户端库(rabbitmq-client)创建生产者和消费者。生产者会创建消息并发送到指定的交换器,消费者则订阅感兴趣的队列,等待接收消息。...
包含了两个主要的组件——"mblog-provider"和"mblog-consumer",分别代表了生产者和消费者的示例,它们将演示如何使用RabbitMQ进行消息的发送和接收,特别是利用主题交换机(Topic Exchange)这一功能。 首先,我们...
关于java程序员发展需要学习的路线整理集合 技术 应用技术 计算机基础知识 cpu mem disk net 线程,进程 第三方库 poi Jsoup zxing Gson 数据结构 树 栈 链表 队列 图 操作系统 linux 代码控制...
5. **确认机制**:RabbitMQ 提供了两种确认机制——自动确认和手动确认。自动确认默认开启,消息一旦被消费者接收到就视为已处理;手动确认则需要消费者显式发送确认信号。 **三、RabbitMQ 高级特性** 1. **持久化...
《RabbitMQ服务器详解——基于2.8.5版本》 RabbitMQ,作为一个开源的消息代理和队列服务器,是企业级消息中间件的首选,它遵循AMQP(Advanced Message Queuing Protocol)协议,提供了高可用性、可扩展性和可靠性。...
RabbitMQ作为一款广泛使用的开源消息代理,经常被集成到基于Java的Spring框架及其子框架Spring MVC中,以实现异步处理和分布式系统通信。下面将详细介绍RabbitMQ与Spring、Spring MVC结合的关键知识点。 1. **...
首先,我们要理解RabbitMQ的核心概念——消息队列。消息队列在分布式系统中起着至关重要的作用,它允许不同的组件以异步的方式通信。当一个组件(生产者)生成消息时,它并不直接发送给另一个组件(消费者),而是将...
1. **Broker**:这是RabbitMQ服务进程,它包含两个关键部分——Exchange和Queue。 2. **Exchange**:消息交换机,根据预设的路由规则将消息路由到相应的队列,起到消息过滤和分配的作用。 3. **Queue**:消息队列,...
接下来是RabbitMQ的核心部分——"rabbitmq-server-3.8.8-1.el7.noarch.rpm"。这是RabbitMQ服务器的3.8.8版本,同样适用于EL7系列的Linux系统。这个版本的RabbitMQ引入了多项改进和修复,确保稳定性和性能。安装此包...
1. **连接与认证**:如何使用RabbitMQ的客户端库(如Python的pika或Java的rabbitmq-client)建立连接,并进行身份验证。 2. **创建Exchange**:如何声明交换机,并指定其类型。 3. **声明Queue**:定义队列,可能...
在本项目"cloud-stream-rabbitmq-test"中,我们将探讨如何将Spring Cloud Stream与RabbitMQ整合,创建自定义的消息通道,以实现双向通信——既能发送消息,也能接收消息。 1. **Spring Cloud Stream基本概念**: -...
本篇文章将聚焦于三大主流MQ——RabbitMQ、Kafka和RocketMQ的底层实现原理以及实战中的问题解决方案,帮助读者深入理解这些技术并提升解决实际问题的能力。 首先,我们来看看RabbitMQ,它是基于AMQP(Advanced ...
针对RabbitMQ而言,其架构还包括一个额外的重要组成部分——交换机(Exchange)。通过引入交换机,RabbitMQ实现了生产者与消息队列之间的解耦。生产者将消息发送至交换机,而交换机会根据预设的路由策略将消息转发到...
4. **工作模式**:RabbitMQ支持两种主要的工作模式——同步(RPC)和异步。同步模式常用于请求-响应场景,而异步模式适用于解耦和负载平衡。 **Spring整合RabbitMQ** 1. **Spring AMQP**:Spring提供了Spring AMQP...
在《Java消息服务(第二版)》这本书中,读者可以深入学习到JMS的核心概念、设计模式以及实际应用。以下是基于JMS的详细知识点: 1. **JMS简介**:JMS是Java平台上的API,用于在分布式系统中创建、发送、接收和读取...
3. **消息队列与主题**:JMS提供两种类型的消息传输模式——点对点(Queue)和发布/订阅(Topic)。消息队列遵循点对点模型,每个消息仅由一个消费者接收,适合一对一的通信。消息主题遵循发布/订阅模型,多个消费者...
描述中的"java消息系统 JMS 学习代码 例子 jar"表明这个压缩包是为了学习JMS而准备的,里面包含了示例代码,便于开发者理解和实践JMS的工作原理。`jar`文件通常用于打包和分发Java类库,这里可能是为了方便导入和...