`

Java学习——rabbitmq(routing)

 
阅读更多

Routing

/* Routing Model
                   /(friends) [...](Q1) -> friend consumers
P -> X(type direct)
                   \(enemies) [...](Q2) -> enemy consumers
*/

前面的Pub/Sub模型实现将消息发布给所有监听这个队列的消费者,如果要指定某些消息发布给特定的几个消费者,需要用到Routing模型。

 

记得Pub/Sub中我们绑定exchange和queue用到下面的代码

 

channel.queueBind(queueName, "chatroom", "");

这里有第三个参数,它的含义依赖于exchange的类型,如果是fanout类型,则这个参数被忽略。fanout类型不容易定制,我们下面用direct类型来实现,此时queue会根据第三个参数来向exchange获取消息。

 

channel.exchangeDeclare("chatroom","direct");
channel.basicPublish("chatroom","friends",null,mesage.getBytes());

//和Pub/Sub一样,我们这里需要一个临时队列
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "chatroom", "friends");

Publisher: Send.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

	public static void main(String[] args) throws IOException {
		// 创建一个连接连接服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","direct");
		
		for(int i = 1; i < 5; i ++){
			String message = "message " + i;
			if(i%2 == 0){
				channel.basicPublish("chatroom", "friends", null, message.getBytes());
				System.out.println(" [x] Sent '" + "friends" + "':'" + message +"'");
			}
			else{
				channel.basicPublish("chatroom", "enemies", null, message.getBytes());
				System.out.println(" [x] Sent '" + "enemies" + "':'" + message +"'");
			}
		}
		channel.close();
		connection.close();
	}
}

Subscriber: Recv.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;

public class Recv {

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 创建一个连接接收数据
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","direct");
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, "chatroom", "friends");
		channel.queueBind(queueName, "chatroom", "enemies");
		// 等待消息
		System.out.println("waiting for messages from all");
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
		}
	}
}

执行效果如下:(开启3个Recv)

 

//Recv 1.java
channel.queueBind(queueName, "chatroom", "friends");
System.out.println("waiting for messages from friends");
/* output
waiting for messages from friends
 [x] Received 'message 2'
 [x] Received 'message 4'
*/
//Recv 2.java
channel.queueBind(queueName, "chatroom", "enemies");
System.out.println("waiting for messages from enemies");
/* output
waiting for messages from enemies
 [x] Received 'message 1'
 [x] Received 'message 3'
*/
//Recv 3.java
channel.queueBind(queueName, "chatroom", "friends");
channel.queueBind(queueName, "chatroom", "enemies");
System.out.println("waiting for messages from all");
/* output
waiting for messages from all
 [x] Received 'message 1'
 [x] Received 'message 2'
 [x] Received 'message 3'
 [x] Received 'message 4'
*/

由于前面的pub/sub用到了chatroom这个exchange,所以需要做如下处理

 

// cannot redeclare exchange 'chatroom' in vhost '/' with different type
root > rabbitmqctl.bat list_queues
Listing queues ...
Error: unable to connect to node rabbit@ciaos-desktop: nodedown

windows下解决上面的错误的方法如下

 

cp -r C:\Windows\.erlang.cookie C:\Users\p00206869\.erlang.cookie

root > rabbitmqctl.bat list_exchanges
Listing exchanges ...
	direct
amq.direct	direct
amq.fanout	fanout
amq.headers	headers
amq.match	headers
amq.rabbitmq.log	topic
amq.rabbitmq.trace	topic
amq.topic	topic
chatroom	fanout
...done.

可以看到chatroom类型为fanout,重置rabbitmq服务(还不知道什么命令可以删除exchange)

 

rabbitmqctl.bat stop_app
rabbitmqctl.bat reset
rabbitmqctl.bat start_app

重启rabbitmq服务的方法如下

 

rabbitmq-service.bat stop
rabbitmq-service.bat start
分享到:
评论

相关推荐

    MQ消息队列之——RabbitMQ

    学习RabbitMQ的学习笔记

    rabbitMQ实战java版-rabbitMQ-demo.zip

    《RabbitMQ实战Java版——基于rabbitMQ-demo.zip的详解》 在当今的分布式系统中,消息队列作为异步处理、解耦组件的关键技术,得到了广泛应用。RabbitMQ作为一款开源的消息代理和队列服务器,以其稳定性和易用性...

    使用Java编写的RabbitMQ连接池方法

    RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...

    java rabbitmq动态注册,监听实现

    在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,提供了高效、可靠的异步通信能力。在这个场景中,"java rabbitmq动态注册,监听实现"涉及到的主要...

    基于Java语言的RabbitMQ学习与实践设计源码

    本项目是一款基于Java语言的RabbitMQ学习与实践设计源码,共计78个文件,涵盖29个Java源文件、14个Markdown文档、8个XML配置文件、5个属性文件、4个Git忽略规则文件、4个命令行脚本文件、3个JAR包文件、2个PNG图片...

    Java使用RabbitMq的一个简单demo

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许...在这个简单的demo中,我们学习了如何创建连接、声明交换机和队列、发送和接收消息,这些都是RabbitMQ的基础操作,为更高级的用法打下了基础。

    RabbitMQ工具类封装实现

    首先,`MQSubscribeService.java`代表的是订阅者服务,它是接收和处理来自RabbitMQ的消息的组件。在封装订阅者线程时,通常会包括以下关键知识点: 1. **创建连接和通道**:使用`ConnectionFactory`创建与RabbitMQ...

    RabbitMQ Java测试客户端

    它涵盖了连接管理、消息发送与接收的基本流程,对于学习和理解RabbitMQ在Java环境下的应用非常有帮助。在实际开发中,你可以根据项目需求调整这些示例,例如增加错误处理、消息确认机制、使用工作队列模型等,以实现...

    rabbitMQ学习笔记

    rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...

    rabbitmq开发规范

    **RabbitMQ开发规范详解** 在使用RabbitMQ进行分布式消息传输时,遵循一定的开发规范至关重要,这不仅可以提高系统的可维护性,也有助于保证数据的一致性和稳定性。本篇文章将详细阐述RabbitMQ的命名规范、消息传输...

    java使用rabbitMq服务

    【Java使用RabbitMQ服务】 RabbitMQ是一款开源的消息队列系统,广泛应用于分布式系统中的消息传递。本文将简要介绍如何在Java环境中使用RabbitMQ,包括安装、基本结构、消息发送模式以及高级特性。 ### 1. 安装 在...

    spring boot中使用RabbitMQ routing路由详解

    RabbitMQ 是一个开源的消息队列系统,支持多种语言,包括 Java、Python、Ruby 等。 Spring Boot 框架提供了对 RabbitMQ 的支持,使得开发者可以轻松地使用 RabbitMQ 实现消息队列。 在上一个教程中,我们创建了一个...

    java版本RabbitMQ实例.rar.rar

    Java版本的RabbitMQ实例是...这个Java版本的RabbitMQ实例是学习如何在Java项目中集成消息队列的宝贵资源。通过它,开发者可以掌握RabbitMQ的核心概念,以及如何在实际场景中利用这些概念构建健壮的、可扩展的应用程序。

    springboot+RabbitMq交换器Direct的demo

    在提供的压缩包文件`rabbitmq_direct`中,可能包含了实现上述功能的源代码,包括Spring Boot项目的结构、配置文件、发送和接收消息的Java类等。读者可以下载这个文件,根据说明运行代码,以更好地理解和实践RabbitMQ...

    Rabbitmq工具类,java工具类RabbitmqUtil

    在Java开发中,RabbitMQ是一个非常流行的开源消息队列系统,它基于AMQP(Advanced Message Queuing Protocol)协议,用于高效地处理异步任务和解耦系统组件。`RabbitmqUtil` 是一个专门为Java开发者设计的工具类,...

    java中间件之rabbitmq

    - **灵活的路由(Flexible Routing)**:RabbitMQ允许通过Exchange来对消息进行路由处理,支持多种内置Exchange类型以满足不同的路由需求,并可通过自定义插件实现更为复杂的路由逻辑。 - **消息集群(Clustering)*...

    javaAPI SpringMVC 集成rabbitMQ 实现了生产消费,重复消费等功能

    在Java开发中,SpringMVC框架常用于构建Web应用程序,而RabbitMQ是一个流行的开源消息队列系统,基于Advanced Message Queuing Protocol (AMQP)。本文将深入探讨如何使用Java API和SpringMVC来集成RabbitMQ,实现...

    rabbitmq教程

    RabbitMQ 三种Exchange.wps————————三种exchange解释及代码 rabbitmq结构.wps————————rabbitmq架构简介 rabbitmq入门.pdf——————入门的文档 RabbitMQ研究与应用.pdf——————简单的研究

    Java全能学习面试手册——Java面试题库.zip

    Java全能学习面试手册——Java面试题库.zip 01 7道消息队列ActiveMQ面试题!.pdf 02 10道Java高级必备的Netty面试题!.pdf 03 10道Java面试必备的设计模式面试题!.pdf 04 10个Java经典的List面试题!.pdf 05 10个...

Global site tag (gtag.js) - Google Analytics