`

Java学习——rabbitmq(pub/sub)

 
阅读更多

Publish/Subscribe

 

/* Pub/Sub Model
                   / [...](Q1) -> subscriber1
P -> X(type fanout)
                   \ [...](Q2) -> subscriber1
*/

这是很常见的应用场景,生产者在某一时刻将一条消息发送给多个消费者。只是需要注意rabbitmq和其他典型的消息队列有些区别的是生产者并不直接将消息放入队列中,实际上生产者并不知道队列的存在。

 

它只能将消息发送给一个exchange交换组件,这个交换组件要做的事很简单。一方面接收消息,另一方面将消息放入队列。不过它需要知道根据什么规则来处理这些消息(是否放入一个特殊队列还是多个队列,或者被丢弃)。有一些规则可以使用direct,topic,headers,fanout.

pub/sub模型中我们用到fanout类型

 

channel.exchangeDeclare("chatroom","fanout");

channel.basicPublish("chatroom","",null,mesage.getBytes());

前面简单的producer/consumer与taskqueue都是需要指定一个队列名,而我们这里需要一个临时队列(缘由如下)

1,consumer无论何时接入rabbit我们都需要一个空的队列(自己随机命名或让rabbitmq给我们指定一个随机队列)

2,consumer不再监听的时候临时队列需要及时删除

 

String queueName = channel.queueDeclare().getQueue();

当创建完exchange和queue,需要绑定它们两

 

channel.queueBind(queueName, "chatroom", "");

Publisher: Send.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

	public static void main(String[] args) throws IOException {
		// 创建一个连接连接服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		// 声明一个队列,可以对队列做配置,如持久化等。然后往队列发送数据
		channel.exchangeDeclare("chatroom","fanout");
		
		for(int i = 1; i < 10; i ++){
			String message = "message " + i;
			channel.basicPublish("chatroom", "", null, message.getBytes());
			System.out.println(" [x] Sent '" + message +"'");
		}
		channel.close();
		connection.close();
	}
}

Subscriber: Recv.java

 

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownSignalException;

public class Recv {

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
		// 创建一个连接接收数据
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		channel.exchangeDeclare("chatroom","fanout");
		String queueName = channel.queueDeclare().getQueue();
		channel.queueBind(queueName, "chatroom", "");
		// 等待消息
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);
		while(true){
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
		}
	}
}

$ javac -cp rabbitmq-client.jar Recv.java Send.java

运行多个Recv实例,可以看到每个接收端都能获取到所有的消息。(后启动的Recv实例只会获取后它启动后的消息)

分享到:
评论

相关推荐

    java消息服务(第二版)

    在《Java消息服务(第二版)》这本书中,读者可以深入学习到JMS的核心概念、设计模式以及实际应用。以下是基于JMS的详细知识点: 1. **JMS简介**:JMS是Java平台上的API,用于在分布式系统中创建、发送、接收和读取...

    JAVA消息服务实例

    2. 发布/订阅(Publish/Subscribe,Pub/Sub)模型: 在发布/订阅模型中,消息被发布到一个主题(Topic),多个订阅者可以订阅这个主题,当有新消息发布时,所有订阅者都会收到该消息的副本。这种模型适用于一对多的...

    JMS1.1规范(中文)

    2. **消息队列与主题**:JMS提供两种消息传递模型——点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。在P2P模型中,消息被发送到一个队列,每个消息仅被一个消费者接收。而在Pub/Sub模型...

    JMS.rar_answers_jms

    3. **两种消息类型**:JMS支持两种消息模型——点对点(Point-to-Point,PTP)和发布/订阅(Publish/Subscribe,Pub/Sub)模式。PTP模式下,每个消息只有一个消费者,而Pub/Sub模式中,一个消息可以被多个订阅者接收...

    深入掌握 JMS(java message service)

    - **消息模型**:JMS支持两种消息通信模型——**点到点**(Point-to-Point, P2P)和**发布/订阅**(Publish/Subscribe, Pub/Sub)。这两种模型分别适用于不同的场景: - **点到点模型**:在这种模型下,消息由发送...

    javax.jms-1.1.jar

    2. **消息模型**:JMS支持两种消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模型中,消息从一个队列(Queue)中发送到另一个队列,每个消息仅被一个消费者接收。...

    java消息中间件教程-activemq

    ### Java消息中间件教程——ActiveMQ #### 一、课程安排与消息中间件的重要性 - **1-1 课程安排**:本课程旨在系统性地介绍ActiveMQ的各个方面,包括安装、使用、集群搭建等内容。 - **1-2 为什么使用消息中间件**...

    JMS_DEMO

    《JMS_DEMO——深入理解Java消息服务》 在IT领域,Java消息服务(Java Message Service,简称JMS)是一个标准接口,它允许应用程序创建、发送、接收和读取消息。JMS_DEMO是一个用于演示JMS实际应用的实例,通过这个...

    JMS与MDB介绍.doc

    1. **消息类型**:JMS支持两种消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。在点对点模型中,消息从一个生产者发送到一个队列,然后由一个消费者接收。而在发布/订阅模型...

    一个很好的jms教程

    1. **消息模型**:JMS支持两种基本的消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。点对点模型中,消息从一个生产者发送到一个队列,由一个或多个消费者接收;而在发布/...

    SYT_JMS_Chat:SYT JMS-聊天

    2. **消息模型**:在JMS中,主要有两种消息模型——点对点(Point-to-Point, PTP)和发布/订阅(Publish/Subscribe, Pub/Sub)。点对点模型中,消息从一个生产者发送到一个队列,由一个消费者接收;而在发布/订阅...

Global site tag (gtag.js) - Google Analytics