`

ActiveMQ(四)——ActiviteMQ实现方式之三:主题发布/订阅

 
阅读更多

消息发送方代码:

 

package com.mycom.activemq;

import java.util.HashMap;
import java.util.Map;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiviteMQ方式3:主题发布/订阅方式
 * 消息发送方(生产者)
 * 
 * @author guweiqiang
 */
public class TopicSender {

	/**
	 * 发送消息
	 */
	public static void sendMessage(String brokerUrl, Map<String, Object> map) {
		// TopicConnectionFatory : 连接工厂
		TopicConnectionFactory topicConnectionFactory;
		
		// TopicConnection : JMS客户端到JMS Provider的连接
		TopicConnection topicConnection = null;
		
		// TopicSession : 发送/接收消息的会话
		TopicSession topicSession = null;
		
		// Topic : 发布/订阅队列
		Topic topic;
		
		// TopicPublisher : 消息发布者
		TopicPublisher publisher;
		
		try{
			// 创建连接工厂TopicConnectionFactory实例
			topicConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);
			
			// 创建连接 TopicConnection
			topicConnection = topicConnectionFactory.createTopicConnection();
			
			// 启动连接
			topicConnection.start();
			
			// 创建会话 TopicSession
			topicSession = topicConnection.createTopicSession(Boolean.TRUE, TopicSession.AUTO_ACKNOWLEDGE);
			
			// 创建发布/订阅队列 Topic
			topic = topicSession.createTopic("ThirdQueue");
			
			// 创建消息发布者 TopicPublisher
			publisher = topicSession.createPublisher(topic);
			publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 设置持久化模式
			
			// 准备工作已完成,可以开始发送消息了
			// 发送消息
			MapMessage mapMsg = topicSession.createMapMessage();
			mapMsg.setString("NAME", (String) map.get("name"));
			mapMsg.setString("LANGUAGE", (String) map.get("language"));
			System.out.println(mapMsg);
			publisher.send(mapMsg);
			
			// 提交会话
			topicSession.commit();
			
		} catch(Exception ex){
			ex.printStackTrace();
		} finally {
			// 释放资源
			if(topicSession != null){
				try {
					topicSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if(topicConnection != null){
				try {
					topicConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 测试
	 */
	public static void main(String[] args) {
		String brokerUrl = "tcp://127.0.0.1:61616";
		Map<String, Object> map = new HashMap<String, Object>();
		map.put("name", "guweiqiang3");
		map.put("language", "java");
		TopicSender.sendMessage(brokerUrl, map);
	}
}

 

 

消息接收方代码:

 

package com.mycom.activemq;

import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * ActiviteMQ方式3:主题发布/订阅方式 
 * 消息接收方(消费者)
 * 
 * @author guweiqiang
 */
public class TopicReceiver {

	/**
	 * 接收消息
	 */
	public static void receiveMessage(String brokerUrl) {
		// TopicConnectionFatory : 连接工厂
		TopicConnectionFactory topicConnectionFactory;

		// TopicConnection : JMS客户端到JMS Provider的连接
		TopicConnection topicConnection = null;

		// TopicSession : 发送/接收消息的会话
		TopicSession topicSession = null;

		// Topic : 发布/订阅队列
		Topic topic;

		// TopicSubscriber : 消息订阅者
		TopicSubscriber topicSubscriber;

		try {
			// 创建连接工厂TopicConnectionFactory实例
			topicConnectionFactory = new ActiveMQConnectionFactory(
					ActiveMQConnection.DEFAULT_USER,
					ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl);

			// 创建连接 TopicConnection
			topicConnection = topicConnectionFactory.createTopicConnection();

			// 启动连接
			topicConnection.start();

			// 创建会话 TopicSession
			topicSession = topicConnection.createTopicSession(Boolean.TRUE,
					TopicSession.AUTO_ACKNOWLEDGE);

			// 创建发布/订阅队列 Topic
			topic = topicSession.createTopic("ThirdQueue");

			// 创建消息订阅者 TopicSubscriber
			topicSubscriber = topicSession.createSubscriber(topic);

			// 准备工作已经完成,可以开始接收消息了(通过实现MessageListener的onMessage方法来接收消息)
			topicSubscriber.setMessageListener(new MessageListener() {

				@Override
				public void onMessage(Message message) {
					if (message != null) {
						MapMessage mapMsg = (MapMessage) message;
						try {
							System.out.println("NAME:"
									+ mapMsg.getString("NAME") + "\t LANGUAGE:"
									+ mapMsg.getString("LANGUAGE"));
						} catch (JMSException e) {
							e.printStackTrace();
						}
					}
				}
			});

			Thread.sleep(1000 * 100); // 进程睡眠一段时间再关闭
			
			// 提交会话
			topicSession.commit();

		} catch (Exception ex) {
			ex.printStackTrace();
		} finally {
			// 释放资源
			if (topicSession != null) {
				try {
					topicSession.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
			if (topicConnection != null) {
				try {
					topicConnection.close();
				} catch (JMSException e) {
					e.printStackTrace();
				}
			}
		}
	}

	/**
	 * 测试
	 */
	public static void main(String[] args) {
		String brokerUrl = "tcp://127.0.0.1:61616";
		TopicReceiver.receiveMessage(brokerUrl);
	}

}

 

 

启动ActiveMQ,再在本地执行上述发送方和接收方代码,运行结果如下:

发送方console:

 

ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {NAME=guweiqiang3, LANGUAGE=java} }

 

 

接收方console:

NAME:guweiqiang3	 LANGUAGE:java

 

 

 

分享到:
评论

相关推荐

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    ActiveMQ的点对点与发布/订阅模式小demo

    在ActiveMQ中,我们可以通过创建一个Topic,然后使用`MessageProducer`发布消息,`MessageSubscriber`订阅主题来实现这种模式。 3. **ActiveMQ的使用**: 在提供的压缩包中,包含了Windows版本的ActiveMQ,这通常...

    Apache ActiveMQ学习笔记【原创:mq的方式有两种:点到点和发布/订阅】

    发布/订阅模型是一种更为灵活的消息传递方式。在这种模型中,生产者不会将消息直接发送给特定的消费者。相反,消息会被发送到一个主题,任何订阅了该主题的消费者都会接收到这条消息。这意味着所有订阅了同一主题的...

    ActiveMQ实战——实现一个简易版的聊天室

    在本篇《ActiveMQ实战——实现一个简易版的聊天室》中,我们将深入探讨如何利用Apache ActiveMQ构建一个简单的在线聊天应用。ActiveMQ是Apache软件基金会的一个开源项目,它是一款功能强大的消息中间件,用于在...

    JMS之ActiveMQ 点对点+发布/订阅

    **ActiveMQ**是Apache软件基金会开发的一款开源消息中间件,实现了JMS标准,提供了对点对点和发布/订阅模型的支持。ActiveMQ具备以下特性: 1. **跨语言支持**:除了Java,还支持其他编程语言,如C++、Python等。 2....

    activeMq点对点和发布/订阅模式demo

    在`activeMQ_demo`这个压缩包中,可能包含了一些示例代码,用于演示如何使用ActiveMQ实现点对点和发布/订阅模式。这些示例可能包括了以下内容: 1. 生产者(Producer):创建和发送消息到队列或主题的代码,展示了...

    Spring整合ActiveMQ实现点对点与主题发布订阅通信

    在本文中,我们将深入探讨如何使用Spring框架与Apache ActiveMQ集成,实现点对点通信(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的通信。ActiveMQ是流行的开源消息中间件,它允许应用程序之间异步传输...

    java springboot整合activemq工程

    java springboot整合activemq工程 #activemq配置 #默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring....

    ActiveMQ 集群——JDBC Master Slave + Broker Cluster

    Broker Cluster 模式是 ActiveMQ 集群中的另一种实现方式。在这个模式下,我们可以将多个 Broker 服务器组合在一起,以提高系统的可扩展性和可靠性。 要配置 Broker Cluster 模式,我们需要配置每个 Broker 服务器...

    ActiveMQ通信方式点对点和订阅发布

    1. 基本概念:与P2P不同,发布/订阅模型中,消息从一个生产者发送到一个主题(Topic),多个订阅者可以订阅同一个主题并接收消息。一条消息可以被多个订阅者同时接收,实现广播效果。 2. 主题:主题是Pub/Sub的核心...

    linux 下apache-activemq.zip

    wget http://apache.mirrors.ionfish.org/activemq/5.x/apache-activemq-5.x.x-bin.zip ``` 解压文件: ``` unzip apache-activemq-5.x.x-bin.zip ``` 将解压后的目录移动到`/opt`或自定义的目录: ``` ...

    使用WebSocket协议接收ActiveMQ消息

    ActiveMQ是Apache软件基金会开发的消息队列产品,它遵循开放标准,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),提供跨语言的API和协议支持,可以处理各种消息传递模式,如点对点、...

    ActiveMQ-Topic订阅发布模式Demo

    在实际应用中,理解和掌握这些知识点将有助于开发人员有效地利用ActiveMQ的发布/订阅模式来实现灵活、可靠的分布式通信。通过阅读提供的博客和实践提供的Demo,你可以深入理解ActiveMQ的Topic订阅发布模式,并将其...

    Spring整合ActiveMQ实现队列和主题发布订阅通信

    Spring框架作为Java企业级应用的事实标准,提供了与ActiveMQ集成的便利,使得开发者可以轻松地在Spring应用中实现消息队列和发布/订阅模式的通信。 本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和...

    ActiveMq发布和订阅消息的实现源码

    本篇文章将深入探讨ActiveMQ的发布/订阅模型(Publish/Subscribe)的实现源码,以及如何与Spring框架进行集成。 首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个...

    activeMQ三种收发消息方式

    在这个小例子中,我们将探讨ActiveMQ的三种主要的消息收发方式:点对点、发布/订阅和事务处理模式。 1. **点对点(Point-to-Point)模式**: 在点对点模式下,消息从一个生产者发送到一个队列,然后由一个或多个...

    activemq jms

    标题中的"ActiveMQ JMS"指的是Apache ActiveMQ,它是一个开源的消息中间件,实现了Java消息服务(Java Message Service,简称JMS)。ActiveMQ是Apache软件基金会的一个项目,它提供了多种协议的支持,包括JMS、AMQP...

    ActiveMQ订阅模式持久化实现

    **ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...

    activemq activeMq笔记

    当一个消息被发布时,所有订阅了该主题的消费者都会收到该消息。 ### 安装与配置 #### 安装ActiveMQ 1. **下载与安装**:首先从官方网站下载最新版本的 ActiveMQ。将其解压到期望的位置,并进入 `bin` 目录运行 `...

    Spring 实现远程访问详解——jms和activemq

    本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...

Global site tag (gtag.js) - Google Analytics