消息发送方代码:
package com.mycom.activemq; import java.util.HashMap; import java.util.Map; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSession; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiviteMQ方式3:主题发布/订阅方式 * 消息发送方(生产者) * * @author guweiqiang */ public class TopicSender { /** * 发送消息 */ public static void sendMessage(String brokerUrl, Map<String, Object> map) { // TopicConnectionFatory : 连接工厂 TopicConnectionFactory topicConnectionFactory; // TopicConnection : JMS客户端到JMS Provider的连接 TopicConnection topicConnection = null; // TopicSession : 发送/接收消息的会话 TopicSession topicSession = null; // Topic : 发布/订阅队列 Topic topic; // TopicPublisher : 消息发布者 TopicPublisher publisher; try{ // 创建连接工厂TopicConnectionFactory实例 topicConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); // 创建连接 TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(); // 启动连接 topicConnection.start(); // 创建会话 TopicSession topicSession = topicConnection.createTopicSession(Boolean.TRUE, TopicSession.AUTO_ACKNOWLEDGE); // 创建发布/订阅队列 Topic topic = topicSession.createTopic("ThirdQueue"); // 创建消息发布者 TopicPublisher publisher = topicSession.createPublisher(topic); publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 设置持久化模式 // 准备工作已完成,可以开始发送消息了 // 发送消息 MapMessage mapMsg = topicSession.createMapMessage(); mapMsg.setString("NAME", (String) map.get("name")); mapMsg.setString("LANGUAGE", (String) map.get("language")); System.out.println(mapMsg); publisher.send(mapMsg); // 提交会话 topicSession.commit(); } catch(Exception ex){ ex.printStackTrace(); } finally { // 释放资源 if(topicSession != null){ try { topicSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if(topicConnection != null){ try { topicConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 测试 */ public static void main(String[] args) { String brokerUrl = "tcp://127.0.0.1:61616"; Map<String, Object> map = new HashMap<String, Object>(); map.put("name", "guweiqiang3"); map.put("language", "java"); TopicSender.sendMessage(brokerUrl, map); } }
消息接收方代码:
package com.mycom.activemq; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicSession; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiviteMQ方式3:主题发布/订阅方式 * 消息接收方(消费者) * * @author guweiqiang */ public class TopicReceiver { /** * 接收消息 */ public static void receiveMessage(String brokerUrl) { // TopicConnectionFatory : 连接工厂 TopicConnectionFactory topicConnectionFactory; // TopicConnection : JMS客户端到JMS Provider的连接 TopicConnection topicConnection = null; // TopicSession : 发送/接收消息的会话 TopicSession topicSession = null; // Topic : 发布/订阅队列 Topic topic; // TopicSubscriber : 消息订阅者 TopicSubscriber topicSubscriber; try { // 创建连接工厂TopicConnectionFactory实例 topicConnectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); // 创建连接 TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(); // 启动连接 topicConnection.start(); // 创建会话 TopicSession topicSession = topicConnection.createTopicSession(Boolean.TRUE, TopicSession.AUTO_ACKNOWLEDGE); // 创建发布/订阅队列 Topic topic = topicSession.createTopic("ThirdQueue"); // 创建消息订阅者 TopicSubscriber topicSubscriber = topicSession.createSubscriber(topic); // 准备工作已经完成,可以开始接收消息了(通过实现MessageListener的onMessage方法来接收消息) topicSubscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message != null) { MapMessage mapMsg = (MapMessage) message; try { System.out.println("NAME:" + mapMsg.getString("NAME") + "\t LANGUAGE:" + mapMsg.getString("LANGUAGE")); } catch (JMSException e) { e.printStackTrace(); } } } }); Thread.sleep(1000 * 100); // 进程睡眠一段时间再关闭 // 提交会话 topicSession.commit(); } catch (Exception ex) { ex.printStackTrace(); } finally { // 释放资源 if (topicSession != null) { try { topicSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if (topicConnection != null) { try { topicConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 测试 */ public static void main(String[] args) { String brokerUrl = "tcp://127.0.0.1:61616"; TopicReceiver.receiveMessage(brokerUrl); } }
启动ActiveMQ,再在本地执行上述发送方和接收方代码,运行结果如下:
发送方console:
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {NAME=guweiqiang3, LANGUAGE=java} }
接收方console:
NAME:guweiqiang3 LANGUAGE:java
相关推荐
在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...
在ActiveMQ中,我们可以通过创建一个Topic,然后使用`MessageProducer`发布消息,`MessageSubscriber`订阅主题来实现这种模式。 3. **ActiveMQ的使用**: 在提供的压缩包中,包含了Windows版本的ActiveMQ,这通常...
发布/订阅模型是一种更为灵活的消息传递方式。在这种模型中,生产者不会将消息直接发送给特定的消费者。相反,消息会被发送到一个主题,任何订阅了该主题的消费者都会接收到这条消息。这意味着所有订阅了同一主题的...
在本篇《ActiveMQ实战——实现一个简易版的聊天室》中,我们将深入探讨如何利用Apache ActiveMQ构建一个简单的在线聊天应用。ActiveMQ是Apache软件基金会的一个开源项目,它是一款功能强大的消息中间件,用于在...
**ActiveMQ**是Apache软件基金会开发的一款开源消息中间件,实现了JMS标准,提供了对点对点和发布/订阅模型的支持。ActiveMQ具备以下特性: 1. **跨语言支持**:除了Java,还支持其他编程语言,如C++、Python等。 2....
在`activeMQ_demo`这个压缩包中,可能包含了一些示例代码,用于演示如何使用ActiveMQ实现点对点和发布/订阅模式。这些示例可能包括了以下内容: 1. 生产者(Producer):创建和发送消息到队列或主题的代码,展示了...
在本文中,我们将深入探讨如何使用Spring框架与Apache ActiveMQ集成,实现点对点通信(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的通信。ActiveMQ是流行的开源消息中间件,它允许应用程序之间异步传输...
java springboot整合activemq工程 #activemq配置 #默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring....
Broker Cluster 模式是 ActiveMQ 集群中的另一种实现方式。在这个模式下,我们可以将多个 Broker 服务器组合在一起,以提高系统的可扩展性和可靠性。 要配置 Broker Cluster 模式,我们需要配置每个 Broker 服务器...
1. 基本概念:与P2P不同,发布/订阅模型中,消息从一个生产者发送到一个主题(Topic),多个订阅者可以订阅同一个主题并接收消息。一条消息可以被多个订阅者同时接收,实现广播效果。 2. 主题:主题是Pub/Sub的核心...
wget http://apache.mirrors.ionfish.org/activemq/5.x/apache-activemq-5.x.x-bin.zip ``` 解压文件: ``` unzip apache-activemq-5.x.x-bin.zip ``` 将解压后的目录移动到`/opt`或自定义的目录: ``` ...
ActiveMQ是Apache软件基金会开发的消息队列产品,它遵循开放标准,如JMS(Java Message Service)和AMQP(Advanced Message Queuing Protocol),提供跨语言的API和协议支持,可以处理各种消息传递模式,如点对点、...
在实际应用中,理解和掌握这些知识点将有助于开发人员有效地利用ActiveMQ的发布/订阅模式来实现灵活、可靠的分布式通信。通过阅读提供的博客和实践提供的Demo,你可以深入理解ActiveMQ的Topic订阅发布模式,并将其...
Spring框架作为Java企业级应用的事实标准,提供了与ActiveMQ集成的便利,使得开发者可以轻松地在Spring应用中实现消息队列和发布/订阅模式的通信。 本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和...
本篇文章将深入探讨ActiveMQ的发布/订阅模型(Publish/Subscribe)的实现源码,以及如何与Spring框架进行集成。 首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个...
在这个小例子中,我们将探讨ActiveMQ的三种主要的消息收发方式:点对点、发布/订阅和事务处理模式。 1. **点对点(Point-to-Point)模式**: 在点对点模式下,消息从一个生产者发送到一个队列,然后由一个或多个...
标题中的"ActiveMQ JMS"指的是Apache ActiveMQ,它是一个开源的消息中间件,实现了Java消息服务(Java Message Service,简称JMS)。ActiveMQ是Apache软件基金会的一个项目,它提供了多种协议的支持,包括JMS、AMQP...
**ActiveMQ订阅模式持久化实现** ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它遵循JMS(Java Message Service)规范,提供了多种消息传递模式,包括发布/订阅(Publish/Subscribe)模式。在发布/订阅...
当一个消息被发布时,所有订阅了该主题的消费者都会收到该消息。 ### 安装与配置 #### 安装ActiveMQ 1. **下载与安装**:首先从官方网站下载最新版本的 ActiveMQ。将其解压到期望的位置,并进入 `bin` 目录运行 `...
本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...