`

Java学习——rabbitmq(routing)

 
阅读更多

Routing

/* Routing Model
                   /(friends) [...](Q1) -> friend consumers
P -> X(type direct)
                   \(enemies) [...](Q2) -> enemy consumers
*/

前面的Pub/Sub模型实现将消息发布给所有监听这个队列的消费者,如果要指定某些消息发布给特定的几个消费者,需要用到Routing模型。

 

记得Pub/Sub中我们绑定exchange和queue用到下面的代码

 

channel.queueBind(queueName, "chatroom", "");

这里有第三个参数,它的含义依赖于exchange的类型,如果是fanout类型,则这个参数被忽略。fanout类型不容易定制,我们下面用direct类型来实现,此时queue会根据第三个参数来向exchange获取消息。

 

channel.exchangeDeclare("chatroom","direct");
channel.basicPublish("chatroom","friends",null,mesage.getBytes());

//和Pub/Sub一样,我们这里需要一个临时队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "chatroom", "friends");

Publisher: Send.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

	public static void main(String[] args) throws IOException {
		// 创建一个连接连接服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","direct");
		
		for(int i = 1; i < 5; i ++){
			String message = "message " + i;
			if(i%2 == 0){
				channel.basicPublish("chatroom", "friends", null, message.getBytes());
				System.out.println(" [x] Sent '" + "friends" + "':'" + message +"'");
			}
			else{
				channel.basicPublish("chatroom", "enemies", null, message.getBytes());
				System.out.println(" [x] Sent '" + "enemies" + "':'" + message +"'");
			}
		}
		channel.close();
		connection.close();
	}
}

Subscriber: Recv.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;

public class Recv {

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 创建一个连接接收数据
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","direct");
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, "chatroom", "friends");
		channel.queueBind(queueName, "chatroom", "enemies");
		// 等待消息
		System.out.println("waiting for messages from all");
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
		}
	}
}

执行效果如下:(开启3个Recv)

 

//Recv 1.java
channel.queueBind(queueName, "chatroom", "friends");
System.out.println("waiting for messages from friends");
/* output
waiting for messages from friends
 [x] Received 'message 2'
 [x] Received 'message 4'
*/
//Recv 2.java
channel.queueBind(queueName, "chatroom", "enemies");
System.out.println("waiting for messages from enemies");
/* output
waiting for messages from enemies
 [x] Received 'message 1'
 [x] Received 'message 3'
*/
//Recv 3.java
channel.queueBind(queueName, "chatroom", "friends");
channel.queueBind(queueName, "chatroom", "enemies");
System.out.println("waiting for messages from all");
/* output
waiting for messages from all
 [x] Received 'message 1'
 [x] Received 'message 2'
 [x] Received 'message 3'
 [x] Received 'message 4'
*/

由于前面的pub/sub用到了chatroom这个exchange,所以需要做如下处理

 

// cannot redeclare exchange 'chatroom' in vhost '/' with different type
root > rabbitmqctl.bat list_queues
Listing queues ...
Error: unable to connect to node rabbit@ciaos-desktop: nodedown

windows下解决上面的错误的方法如下

 

cp -r C:\Windows\.erlang.cookie C:\Users\p00206869\.erlang.cookie

root > rabbitmqctl.bat list_exchanges
Listing exchanges ...
	direct
amq.direct	direct
amq.fanout	fanout
amq.headers	headers
amq.match	headers
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
chatroom	fanout
...done.

可以看到chatroom类型为fanout,重置rabbitmq服务(还不知道什么命令可以删除exchange)

 

rabbitmqctl.bat stop_app
rabbitmqctl.bat reset
rabbitmqctl.bat start_app

重启rabbitmq服务的方法如下

 

rabbitmq-service.bat stop
rabbitmq-service.bat start
分享到:
评论

相关推荐

    MQ消息队列之——RabbitMQ

    学习RabbitMQ的学习笔记

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    使用Java编写的RabbitMQ连接池方法

    RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...

    基于Java语言的RabbitMQ学习与实践设计源码

    本项目是一款基于Java语言的RabbitMQ学习与实践设计源码,共计78个文件,涵盖29个Java源文件、14个Markdown文档、8个XML配置文件、5个属性文件、4个Git忽略规则文件、4个命令行脚本文件、3个JAR包文件、2个PNG图片...

    Java使用RabbitMq的一个简单demo

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许...在这个简单的demo中,我们学习了如何创建连接、声明交换机和队列、发送和接收消息,这些都是RabbitMQ的基础操作,为更高级的用法打下了基础。

    rabbitmq开发规范

    **RabbitMQ开发规范详解** 在使用RabbitMQ进行分布式消息传输时,遵循一定的开发规范至关重要,这不仅可以提高系统的可维护性,也有助于保证数据的一致性和稳定性。本篇文章将详细阐述RabbitMQ的命名规范、消息传输...

    java rabbitmq动态注册,监听实现

    在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高效、可靠的异步通信能力。在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要...

    RabbitMQ工具类封装实现

    首先,`MQSubscribeService.java`代表的是订阅者服务,它是接收和处理来自RabbitMQ的消息的组件。在封装订阅者线程时,通常会包括以下关键知识点: 1. **创建连接和通道**:使用`ConnectionFactory`创建与RabbitMQ...

    RabbitMQ Java测试客户端

    它涵盖了连接管理、消息发送与接收的基本流程,对于学习和理解RabbitMQ在Java环境下的应用非常有帮助。在实际开发中,你可以根据项目需求调整这些示例,例如增加错误处理、消息确认机制、使用工作队列模型等,以实现...

    rabbitMQ学习笔记

    rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...

    java使用rabbitMq服务

    【Java使用RabbitMQ服务】 RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中的消息传递。本文将简要介绍如何在Java环境中使用RabbitMQ,包括安装、基本结构、消息发送模式以及高级特性。 ### 1. 安装 在...

    spring boot中使用RabbitMQ routing路由详解

    RabbitMQ 是一个开源的消息队列系统,支持多种语言,包括 Java、Python、Ruby 等。 Spring Boot 框架提供了对 RabbitMQ 的支持,使得开发者可以轻松地使用 RabbitMQ 实现消息队列。 在上一个教程中,我们创建了一个...

    Rabbitmq工具类,java工具类RabbitmqUtil

    在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于高效地处理异步任务和解耦系统组件。`RabbitmqUtil` 是一个专门为Java开发者设计的工具类,...

    java版本RabbitMQ实例.rar.rar

    Java版本的RabbitMQ实例是...这个Java版本的RabbitMQ实例是学习如何在Java项目中集成消息队列的宝贵资源。通过它,开发者可以掌握RabbitMQ的核心概念,以及如何在实际场景中利用这些概念构建健壮的、可扩展的应用程序。

    springboot+RabbitMq交换器Direct的demo

    在提供的压缩包文件`rabbitmq_direct`中,可能包含了实现上述功能的源代码,包括Spring Boot项目的结构、配置文件、发送和接收消息的Java类等。读者可以下载这个文件,根据说明运行代码,以更好地理解和实践RabbitMQ...

    java中间件之rabbitmq

    - **灵活的路由(Flexible Routing)**:RabbitMQ允许通过Exchange来对消息进行路由处理,支持多种内置Exchange类型以满足不同的路由需求,并可通过自定义插件实现更为复杂的路由逻辑。 - **消息集群(Clustering)*...

    javaAPI SpringMVC 集成rabbitMQ 实现了生产消费,重复消费等功能

    在Java开发中,SpringMVC框架常用于构建Web应用程序,而RabbitMQ是一个流行的开源消息队列系统,基于Advanced Message Queuing Protocol (AMQP)。本文将深入探讨如何使用Java API和SpringMVC来集成RabbitMQ,实现...

    rabbitmq教程

    RabbitMQ 三种Exchange.wps————————三种exchange解释及代码 rabbitmq结构.wps————————rabbitmq架构简介 rabbitmq入门.pdf——————入门的文档 RabbitMQ研究与应用.pdf——————简单的研究

    Java全能学习面试手册——Java面试题库.zip

    Java全能学习面试手册——Java面试题库.zip 01 7道消息队列ActiveMQ面试题!.pdf 02 10道Java高级必备的Netty面试题!.pdf 03 10道Java面试必备的设计模式面试题!.pdf 04 10个Java经典的List面试题!.pdf 05 10个...

    rabbitMQ Java开发案例

    javaAPI SpringMVC 集成rabbitMQ 很全的例子,实现了生产消费,重复消费等功能

Global site tag (gtag.js) - Google Analytics