发送方代码:
package com.mycom.activemq; import java.util.HashMap; import java.util.Map; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiviteMQ方式2:Queue方式 * 消息发送方(生产者) * * @author guweiqiang */ public class QueueSender { /** * 发送消息 */ public static void sendMessage(String brokerUrl, Map<String, Object> map) { // QueueConnectionFactory : 连接工厂 QueueConnectionFactory queueConnectionFactory; // QueueConnection : JMS客户端到JMS Provider的连接 QueueConnection queueConnection = null; // QueueSession : 发送/接收消息的会话 QueueSession queueSession = null; // Queue : 消息队列 Queue queue; // QueueSender : 消息发送者 javax.jms.QueueSender queueSender; try { // 创建一个连接工厂QueueConnectionFactory实例 queueConnectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); // 创建一个QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); // 启动连接 queueConnection.start(); // 创建发送/接收消息的会话 QueueSession queueSession = queueConnection.createQueueSession(Boolean.TRUE, QueueSession.AUTO_ACKNOWLEDGE); // 创建消息队列 Queue queue = queueSession.createQueue("SecondQueue"); // 创建消息发送者 QueueSender queueSender = queueSession.createSender(queue); queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 准备工作已完成,可以开始发送消息了 // 发送消息 MapMessage mapMsg = queueSession.createMapMessage(); mapMsg.setInt("ID", (Integer) map.get("id")); mapMsg.setString("NAME", (String) map.get("name")); mapMsg.setInt("AGE", (Integer) map.get("age")); System.out.println(mapMsg); queueSender.send(mapMsg); // 提交会话 queueSession.commit(); } catch (Exception ex) { ex.printStackTrace(); } finally { // 释放资源 if (queueSession != null) { try { queueSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 测试 */ public static void main(String[] args) { String brokerUrl = "tcp://127.0.0.1:61616"; Map<String, Object> map = new HashMap<String, Object>(); map.put("id", 1002); map.put("name", "guweiqiang2"); map.put("age", 22); QueueSender.sendMessage(brokerUrl, map); } }
接收方代码:
package com.mycom.activemq; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.QueueSession; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * ActiviteMQ方式2:Queue方式 * 消息接收方(消费者) * * @author guweiqiang */ public class QueueReceiver { /** * 接收消息 */ public static void receiveMessage(String brokerUrl) { // QueueConnectionFactory : 连接工厂 QueueConnectionFactory queueConnectionFactory; // QueueConnection : JMS客户端到JMS Provider的连接 QueueConnection queueConnection = null; // QueueSession : 发送/接收消息的会话 QueueSession queueSession = null; // Queue : 消息队列 Queue queue; // QueueReceiver : 消息接收者 javax.jms.QueueReceiver queueReceiver; try { // 创建一个连接工厂QueueConnectionFactory实例 queueConnectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, brokerUrl); // 创建一个QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(); // 启动连接 queueConnection.start(); // 创建发送/接收消息的会话 QueueSession queueSession = queueConnection.createQueueSession(Boolean.TRUE, QueueSession.AUTO_ACKNOWLEDGE); // 创建消息队列 Queue queue = queueSession.createQueue("SecondQueue"); // 创建消息接收者 queueReceiver = queueSession.createReceiver(queue); // 准备工作已经完成,可以开始接收消息了(通过实现MessageListener的onMessage方法来接收消息) queueReceiver.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if (message != null) { MapMessage mapMsg = (MapMessage) message; try { System.out.println("ID:" + mapMsg.getInt("ID") + "\t NAME:" + mapMsg.getString("NAME") + "\t AGE:" + mapMsg.getInt("AGE")); } catch (JMSException e) { e.printStackTrace(); } } } }); Thread.sleep(1000 * 100); // 进程睡眠一段时间再关闭 // 提交会话 queueSession.commit(); } catch (Exception ex) { ex.printStackTrace(); } finally { // 释放资源 if (queueSession != null) { try { queueSession.close(); } catch (JMSException e) { e.printStackTrace(); } } if (queueConnection != null) { try { queueConnection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 测试 */ public static void main(String[] args) { String brokerUrl = "tcp://127.0.0.1:61616"; QueueReceiver.receiveMessage(brokerUrl); } }
启动ActiveMQ,再在本地执行上述发送方和接收方代码,运行结果如下:
发送方console:
ActiveMQMapMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false} ActiveMQMapMessage{ theTable = {NAME=guweiqiang2, AGE=22, ID=1002} }
接收方console:
ID:1002 NAME:guweiqiang2 AGE:22
相关推荐
在本篇《ActiveMQ实战——实现一个简易版的聊天室》中,我们将深入探讨如何利用Apache ActiveMQ构建一个简单的在线聊天应用。ActiveMQ是Apache软件基金会的一个开源项目,它是一款功能强大的消息中间件,用于在...
Broker Cluster 模式是 ActiveMQ 集群中的另一种实现方式。在这个模式下,我们可以将多个 Broker 服务器组合在一起,以提高系统的可扩展性和可靠性。 要配置 Broker Cluster 模式,我们需要配置每个 Broker 服务器...
在这个小例子中,我们将探讨ActiveMQ的三种主要的消息收发方式:点对点、发布/订阅和事务处理模式。 1. **点对点(Point-to-Point)模式**: 在点对点模式下,消息从一个生产者发送到一个队列,然后由一个或多个...
2. 查找MBean:在MBean浏览器中找到ActiveMQ相关的MBeans,如`org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=myQueue`。 3. 操作MBean:通过MBean的操作接口,可以...
- 或者通过命令行方式,在 JVM 中启动 ActiveMQ:`cd example` 后运行 `ant embedBroker`。 4. **管理界面**:通过浏览器访问 `http://localhost:8161/admin` 进入 ActiveMQ 的管理后台系统,可以查看和管理 ...
- **ActiveMQ**:Apache ActiveMQ 是一个开源的消息中间件,它实现了 Java Message Service (JMS) 规范,提供可靠的消息传递和队列管理。 - **JMS**:Java Message Service 是一个标准接口,用于在分布式环境中交换...
本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和获取,但是没有自定义监听(当前项目不需要),本文档只有功能实现类 即业务层。若要调用和推送 则需要自己根据需求编写。...
activemq: broker-url: tcp://localhost:61616 user: admin password: admin ``` 配置完成后,我们创建一个`MessageReceiver`类来接收消息。这个类通常会实现`MessageListener`接口,这样可以监听消息队列中的...
### 深入掌握JMS——ActiveMQ 十一章 #### JMS基本概念与重要知识点解析 **JMS(Java Message Service)简介** JMS(Java消息服务)是一种广泛应用于企业级应用中的消息中间件协议,它为应用程序提供了一种高效、...
本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. 什么是Apache ActiveMq Apache ActiveMq是最流行和最强大的开源消息和集成服务器。同时Apache ActiveMq...
#默认情况下activemq提供的是queue模式 true是可以使用topic,false是仅使用queue模式 spring.jms.pub-sub-domain: true # 设置连接的activemq服务器 spring.activemq.broker-url=failover:(tcp://10.0.1.227:61616,...
在本项目中,我们涵盖了ActiveMQ的基本安装和使用,以及如何通过代码与之交互。 首先,让我们了解什么是ActiveMQ。ActiveMQ是一个高性能、可靠的、完全支持JMS 1.1规范的消息中间件。它提供了多种协议的支持,包括...
3. **存储策略**:ActiveMQ支持两种存储方式——内存存储和文件存储(默认是KahaDB)。内存存储速度快但不持久,文件存储持久但相对较慢。可以通过配置`<persistenceAdapter>`元素来切换。 4. **网络连接**:...
### ActiveMQ+zookeeper实现高可用和负载均衡 #### 一、背景与目标 在现代分布式系统中,消息中间件如ActiveMQ扮演着重要的角色,它能够帮助应用之间进行可靠的消息传递。为了保证系统的稳定性和可靠性,通常需要...
本资源提供的内容是关于ActiveMQ的连接池实现,分为两部分:一是作者自己实现的ActiveMQ连接池,二是新版本ActiveMQ自带的连接池。连接池是一种资源管理技术,通过复用已建立的数据库连接或网络连接,减少创建和销毁...
java中使用消息中间件ActiveMQ的MQTT协议发布消息使用fusesource,fusesource提供三种方式实现发布消息的方式,分别是阻塞式(BlockingConnection)、回调式(CallbackConnection)和Future样式(FutureConnection)
【标题】:“实验三 消息中间件应用开发:ActiveMQ实现单线程多队列” 在IT领域,消息中间件是一种重要的软件架构组件,它主要用于应用程序之间的异步通信,提高系统的可扩展性和解耦性。本实验主要关注的是如何...
本文将深入探讨ActiveMQ中的队列(Queue)模式,包括事务、应答、转发以及MessageConsumer的receive阻塞消息处理方式。 ### 1. ActiveMQ队列(Queue)模式 在ActiveMQ中,队列是一种点对点的消息传递模型,每个...
这通常包括`spring-boot-starter-activemq`和`org.apache.activemq:activemq-client`,以及可能的MQTT客户端库,如`org.eclipse.paho:org.eclipse.paho.client.mqttv3`。 2. 配置ActiveMQ:在`application....