Routing
/* Routing Model /(friends) [...](Q1) -> friend consumers P -> X(type direct) \(enemies) [...](Q2) -> enemy consumers */
前面的Pub/Sub模型实现将消息发布给所有监听这个队列的消费者,如果要指定某些消息发布给特定的几个消费者,需要用到Routing模型。
记得Pub/Sub中我们绑定exchange和queue用到下面的代码
channel.queueBind(queueName, "chatroom", "");
这里有第三个参数,它的含义依赖于exchange的类型,如果是fanout类型,则这个参数被忽略。fanout类型不容易定制,我们下面用direct类型来实现,此时queue会根据第三个参数来向exchange获取消息。
channel.exchangeDeclare("chatroom","direct"); channel.basicPublish("chatroom","friends",null,mesage.getBytes()); //和Pub/Sub一样,我们这里需要一个临时队列 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "chatroom", "friends");
Publisher: Send.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { public static void main(String[] args) throws IOException { // 创建一个连接连接服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("chatroom","direct"); for(int i = 1; i < 5; i ++){ String message = "message " + i; if(i%2 == 0){ channel.basicPublish("chatroom", "friends", null, message.getBytes()); System.out.println(" [x] Sent '" + "friends" + "':'" + message +"'"); } else{ channel.basicPublish("chatroom", "enemies", null, message.getBytes()); System.out.println(" [x] Sent '" + "enemies" + "':'" + message +"'"); } } channel.close(); connection.close(); } }
Subscriber: Recv.java
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ShutdownSignalException; public class Recv { public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { // 创建一个连接接收数据 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //factory.setPort(1987); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("chatroom","direct"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "chatroom", "friends"); channel.queueBind(queueName, "chatroom", "enemies"); // 等待消息 System.out.println("waiting for messages from all"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
执行效果如下:(开启3个Recv)
//Recv 1.java channel.queueBind(queueName, "chatroom", "friends"); System.out.println("waiting for messages from friends"); /* output waiting for messages from friends [x] Received 'message 2' [x] Received 'message 4' */ //Recv 2.java channel.queueBind(queueName, "chatroom", "enemies"); System.out.println("waiting for messages from enemies"); /* output waiting for messages from enemies [x] Received 'message 1' [x] Received 'message 3' */ //Recv 3.java channel.queueBind(queueName, "chatroom", "friends"); channel.queueBind(queueName, "chatroom", "enemies"); System.out.println("waiting for messages from all"); /* output waiting for messages from all [x] Received 'message 1' [x] Received 'message 2' [x] Received 'message 3' [x] Received 'message 4' */
由于前面的pub/sub用到了chatroom这个exchange,所以需要做如下处理
// cannot redeclare exchange 'chatroom' in vhost '/' with different type root > rabbitmqctl.bat list_queues Listing queues ... Error: unable to connect to node rabbit@ciaos-desktop: nodedown
windows下解决上面的错误的方法如下
cp -r C:\Windows\.erlang.cookie C:\Users\p00206869\.erlang.cookie root > rabbitmqctl.bat list_exchanges Listing exchanges ... direct amq.direct direct amq.fanout fanout amq.headers headers amq.match headers amq.rabbitmq.log topic amq.rabbitmq.trace topic amq.topic topic chatroom fanout ...done.
可以看到chatroom类型为fanout,重置rabbitmq服务(还不知道什么命令可以删除exchange)
rabbitmqctl.bat stop_app rabbitmqctl.bat reset rabbitmqctl.bat start_app
重启rabbitmq服务的方法如下
rabbitmq-service.bat stop rabbitmq-service.bat start
相关推荐
学习RabbitMQ的学习笔记
《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...
RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
本项目是一款基于Java语言的RabbitMQ学习与实践设计源码,共计78个文件,涵盖29个Java源文件、14个Markdown文档、8个XML配置文件、5个属性文件、4个Git忽略规则文件、4个命令行脚本文件、3个JAR包文件、2个PNG图片...
在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许...在这个简单的demo中,我们学习了如何创建连接、声明交换机和队列、发送和接收消息,这些都是RabbitMQ的基础操作,为更高级的用法打下了基础。
**RabbitMQ开发规范详解** 在使用RabbitMQ进行分布式消息传输时,遵循一定的开发规范至关重要,这不仅可以提高系统的可维护性,也有助于保证数据的一致性和稳定性。本篇文章将详细阐述RabbitMQ的命名规范、消息传输...
在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高效、可靠的异步通信能力。在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要...
首先,`MQSubscribeService.java`代表的是订阅者服务,它是接收和处理来自RabbitMQ的消息的组件。在封装订阅者线程时,通常会包括以下关键知识点: 1. **创建连接和通道**:使用`ConnectionFactory`创建与RabbitMQ...
它涵盖了连接管理、消息发送与接收的基本流程,对于学习和理解RabbitMQ在Java环境下的应用非常有帮助。在实际开发中,你可以根据项目需求调整这些示例,例如增加错误处理、消息确认机制、使用工作队列模型等,以实现...
rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...
【Java使用RabbitMQ服务】 RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中的消息传递。本文将简要介绍如何在Java环境中使用RabbitMQ,包括安装、基本结构、消息发送模式以及高级特性。 ### 1. 安装 在...
RabbitMQ 是一个开源的消息队列系统,支持多种语言,包括 Java、Python、Ruby 等。 Spring Boot 框架提供了对 RabbitMQ 的支持,使得开发者可以轻松地使用 RabbitMQ 实现消息队列。 在上一个教程中,我们创建了一个...
在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于高效地处理异步任务和解耦系统组件。`RabbitmqUtil` 是一个专门为Java开发者设计的工具类,...
Java版本的RabbitMQ实例是...这个Java版本的RabbitMQ实例是学习如何在Java项目中集成消息队列的宝贵资源。通过它,开发者可以掌握RabbitMQ的核心概念,以及如何在实际场景中利用这些概念构建健壮的、可扩展的应用程序。
在提供的压缩包文件`rabbitmq_direct`中,可能包含了实现上述功能的源代码,包括Spring Boot项目的结构、配置文件、发送和接收消息的Java类等。读者可以下载这个文件,根据说明运行代码,以更好地理解和实践RabbitMQ...
- **灵活的路由(Flexible Routing)**:RabbitMQ允许通过Exchange来对消息进行路由处理,支持多种内置Exchange类型以满足不同的路由需求,并可通过自定义插件实现更为复杂的路由逻辑。 - **消息集群(Clustering)*...
在Java开发中,SpringMVC框架常用于构建Web应用程序,而RabbitMQ是一个流行的开源消息队列系统,基于Advanced Message Queuing Protocol (AMQP)。本文将深入探讨如何使用Java API和SpringMVC来集成RabbitMQ,实现...
RabbitMQ 三种Exchange.wps————————三种exchange解释及代码 rabbitmq结构.wps————————rabbitmq架构简介 rabbitmq入门.pdf——————入门的文档 RabbitMQ研究与应用.pdf——————简单的研究
Java全能学习面试手册——Java面试题库.zip 01 7道消息队列ActiveMQ面试题!.pdf 02 10道Java高级必备的Netty面试题!.pdf 03 10道Java面试必备的设计模式面试题!.pdf 04 10个Java经典的List面试题!.pdf 05 10个...
javaAPI SpringMVC 集成rabbitMQ 很全的例子,实现了生产消费,重复消费等功能