直接上代码:
发送:
public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS // Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // MessageProducer:消息发送者 MessageProducer producer; // TextMessage message; // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); // 得到消息生成者【发送者】 producer = session.createProducer(destination); // 设置不持久化,此处学习,实际根据项目决定 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 构造消息,此处写死,项目就是参数,或者方法获取 sendMessage(session, producer); session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } } public static void sendMessage(Session session, MessageProducer producer) throws Exception { for (int i = 1; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i); // 发送消息到目的地方 System.out.println("发送消息:" + "ActiveMq 发送的消息" + i); producer.send(message); } }
接收:
public static void main(String[] args) { // ConnectionFactory :连接工厂,JMS 用它创建连接 ConnectionFactory connectionFactory; // Connection :JMS 客户端到JMS Provider 的连接 Connection connection = null; // Session: 一个发送或接收消息的线程 Session session; // Destination :消息的目的地;消息发送给谁. Destination destination; // 消费者,消息接收者 MessageConsumer consumer; connectionFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "failover:(tcp://localhost:61616,tcp://10.1.1.123:61616)"); try { // 构造从工厂得到连接对象 connection = connectionFactory.createConnection(); // 启动 connection.start(); // 获取操作连接 session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置 destination = session.createQueue("FirstQueue"); consumer = session.createConsumer(destination); while (true) { // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s TextMessage message = (TextMessage) consumer.receive(100000); if (null != message) { System.out.println("收到消息" + message.getText()); } else { break; } } } catch (Exception e) { e.printStackTrace(); } finally { try { if (null != connection) connection.close(); } catch (Throwable ignore) { } } }
基于Spring的ActiveMQ:
XML文件:
<bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <property name="clientID" value="clientId_007" /> </bean> <bean id="deviceListener" class="net.coc.activemq.listener.DeviceMessageListener"> <property name="deviceAdapter" ref="deviceAdapter"></property> </bean> <bean id="deviceTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="Assets.Device.Topic" /> </bean> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="topicListenConnectionFactory" /> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> <property name="destination" ref="deviceTopic" /> <property name="subscriptionDurable" value="true" /> <property name="clientId" value="deviceListenerclientId_007" /> <property name="durableSubscriptionName" value="deviceListenerclientId_007" /> <property name="messageListener" ref="deviceListener" /> </bean>
Java:
public class DeviceMessageListener implements MessageListener { private DeviceAdapter deviceAdapter; public DeviceAdapter getDeviceAdapter() { return deviceAdapter; } public void setDeviceAdapter(DeviceAdapter deviceAdapter) { this.deviceAdapter = deviceAdapter; } @Override public void onMessage(Message message) { TextMessage textMsg = (TextMessage) message; try { JSONObject msg = JSONObject.fromObject(textMsg.getText()); deviceAdapter.execute(msg); deviceAdapter.finish(); } catch (JMSException e) { e.printStackTrace(); throw new RuntimeException(e); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException(e); } } }
deviceAdapter 是具体解析消息的处理类,这个需要根据各自的需求自己定制;
相关推荐
通过这个"ActiveMQ初学使用demo",你将能够掌握ActiveMQ的基本操作,理解消息队列和主题的工作原理,以及如何在实际项目中使用它们。这将为你的分布式系统设计打下坚实的基础,帮助你有效地解耦系统组件,提高系统的...
本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...
基于springboot构建消息队列通信demo,针对kafka、activemq初学者,安装部署好activemq和kafka后,修改application.yml 。启动应用即可测试,可帮助快速了解kafka、activemq 两者在 Queue topic producer consumer ...
此文档专门针对ActiveMQ初学者,内容清晰易懂,适合所有开发人员用
这篇博客"jms之activeMQ 队列和广播模式例子"主要面向初学者,旨在提供ActiveMQ入门级的知识,通过实例解释队列(Queue)和主题(Topic)这两种基本的消息模式。 首先,我们要理解JMS中的队列和主题的区别。队列...
无论是对于初学者还是有经验的开发者来说,理解这些概念都是非常重要的。掌握了 CMS 的基本原理和 ActiveMQ-CPP 的使用方法后,开发者就能够更高效地构建复杂的应用系统,并利用消息队列的强大功能来优化应用程序的...
对于初学者,理解这些基本元素是掌握ActiveMQ配置的关键。配合提供的文档,如《activeMQ in Action.doc》和《ActiveMQ测试报告.pdf》,可以更深入地学习ActiveMQ的工作原理和最佳实践。对于与数据库的集成,如`...
这种简易性使得初学者能够快速上手,深入理解消息中间件的基本概念和功能。 在实际的ActiveMQ demo中,通常会包含以下关键知识点: 1. **安装与配置**:首先,你需要下载并安装ActiveMQ,然后启动其内置的Broker...
这是一本实践导向的入门指南,适合初学者,通常会包含实例代码和逐步教程,帮助读者快速掌握ActiveMQ的基本用法。 8. **《Java消息服务(第2版)》.pdf**: 这是一本详细介绍JMS规范的书籍,对于理解ActiveMQ的...
根据给定的文件信息,以下是从...无论是对于初学者还是有经验的开发者来说,这本书都提供了从理论到实践的全面指导,帮助读者掌握ActiveMQ的核心概念和实际操作技能,从而在分布式系统设计和开发中发挥关键作用。
本 Demo 将向你展示如何配置 ActiveMQ 以使用 SSL 加密文件,帮助初学者快速理解并实践这一关键安全措施。 首先,了解 SSL/TLS 协议是至关重要的。SSL(Secure Sockets Layer)与 TLS(Transport Layer Security)...
**ActiveMQ实战(英文版)** ...总之,《ActiveMQ实战(英文版)》是一本全面介绍ActiveMQ的实用指南,无论你是初学者还是有经验的开发者,都能从中获得宝贵的知识,提升你在分布式系统中的消息处理能力。
总的来说,《ActiveMQ In Action》及其源码是学习和掌握ActiveMQ不可或缺的参考资料,无论你是初学者还是经验丰富的开发者,都能从中受益匪浅。通过学习和实践,你可以更好地利用ActiveMQ构建可靠、高效的分布式系统...
本视频学习资料《一头扎进JMS之ActiveMQ》第一讲,旨在帮助初学者快速理解ActiveMQ的基本概念、功能以及如何在实际项目中应用。 JMS(Java Message Service)是Java平台中用于企业级应用之间进行异步通信的标准接口...
Apache ActiveMQ 是一款高度可扩展且功能强大的消息中间件,它是Apache软件基金会的一部分,完全开源且免费...无论你是初学者还是经验丰富的开发者,深入理解并熟练掌握ActiveMQ都将极大地提升你的系统设计和实现能力。
【描述】:“这个示例是关于ActiveMQ的实践应用,它结合了Spring框架,以单元测试的形式呈现了各种使用场景,对于初学者理解ActiveMQ的功能和用法非常有帮助。” 【标签】:“activemq”表明这是与ActiveMQ相关的...
ActiveMQ是一种基于JMS的开源消息代理,广泛应用于企业消息传递解决方案中。...对于初学者来说,熟悉这些基本操作是学习ActiveMQ消息传递和消息队列管理的起点,也是将ActiveMQ集成进各种企业应用架构的基础。
最后,对于初学者,可以参考ActiveMQ的官方文档和社区资源,那里有许多示例和教程可以帮助理解和实现插件。同时,理解JMS规范以及消息传递模式(如点对点和发布/订阅)也是使用ActiveMQ插件的基础。 总的来说,...
此外,官方文档提供了详尽的安装指导,包括但不限于下载、配置、测试等步骤,帮助初学者快速上手。 #### 使用场景 在实际应用中,ActiveMQ广泛应用于以下几种典型场景: 1. **消息路由与分发**:ActiveMQ作为消息...