ActiveMQ入门示例
ActiveMQ有两种模式,点对点和发布/订阅模式,点对点中消息只能被一个消费者消费,而发布订阅中,消息可以被一群消费者消费,很好理解。下面的例子是点对点的
安装ActiveMQ很简单就不说了,客户端使用API只需添加以下依赖:
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency>
代码:
Sender.java:
package cc.lixiaohui.test.jms.activemq.p2p; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import cc.lixiaohui.test.jms.activemq.Constants; public class Sender { //发送间隔 private long interval = 1 * 1000; private ConnectionFactory factory; private Connection conn; private Destination dest; private Session session; private MessageProducer producer; // 单线程池负责发送 private ExecutorService worker = Executors.newSingleThreadExecutor(); private volatile boolean stop = false; private static final Logger logger = Logger.getLogger(Sender.class); public Sender(String brokenURL, String user, String passwd, String queueName) throws JMSException { factory = new ActiveMQConnectionFactory(user, passwd, brokenURL); conn = factory.createConnection(); conn.start(); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); dest = session.createQueue(queueName); producer = session.createProducer(dest); } public void setInterval(long l) { interval = l; } public void start() { worker.submit(new SendTask()); } public synchronized void stop() { stop = true; worker.shutdown(); } private TextMessage randomMsg() { String uuid = UUID.randomUUID().toString(); TextMessage msg = null; try { msg = session.createTextMessage(uuid);//把uuid作为消息 msg.setJMSCorrelationID(uuid); } catch (JMSException e) { e.printStackTrace(); } return msg; } private class SendTask implements Runnable { public void run() { logger.info("Send task begin..."); while (!stop) { try { // 发送 TextMessage msg = randomMsg(); producer.send(msg); session.commit(); // commit后消息才会发送到服务端 logger.info("Send text message : " + msg.getText()); // 间隔 Thread.sleep(interval); } catch (Exception e) { e.printStackTrace(); break; } } logger.info("Send task finished..."); } } public static void main(String[] args) throws JMSException { Sender sender = new Sender(Constants.URL, Constants.USER, Constants.PASSWD, Constants.DEFAULT_QUEUE); sender.start(); } }
Reciever.java:
package cc.lixiaohui.test.jms.activemq.p2p; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import cc.lixiaohui.test.jms.activemq.Constants; public class Reciever { public static final int RECIEVE_MODE_SYNC = 0; public static final int RECIEVE_MODE_ASYNC = 1; private ConnectionFactory factory; private Connection conn; private Destination dest; private Session session; private MessageConsumer consumer; private ExecutorService worker = Executors.newSingleThreadExecutor(); private volatile boolean stop = false; private long interval = 3 * 1000; private static final Logger logger = Logger.getLogger(Reciever.class); // 同步/异步接收模式,默认同步 private int mode = RECIEVE_MODE_SYNC; public Reciever(String brokenURL, String user, String passwd, String queueName) throws JMSException { factory = new ActiveMQConnectionFactory(user, passwd, brokenURL); conn = factory.createConnection(); conn.start(); session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); dest = session.createQueue(queueName); consumer = session.createConsumer(dest); } public void setInterval(long l) { interval = l; } public void setMode(int mode) { this.mode = mode; } public void start() throws JMSException { if (mode == RECIEVE_MODE_ASYNC) {// 异步 logger.info("Recieved task begin in async mode..."); // 由activemq组件回调 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { handleRecievedMessage(message); try { session.commit(); } catch (JMSException e) { e.printStackTrace(); stop = true; } } }); } else if (mode == RECIEVE_MODE_SYNC) {// 同步, 由于另起线程, 这里也不阻塞 worker.submit(new RecieveTask()); } } private void handleRecievedMessage(Message recievedMsg) { if (recievedMsg instanceof TextMessage) { TextMessage msg = (TextMessage) recievedMsg; try { logger.info("Recieved message : " + msg.getText()); } catch (JMSException e) { e.printStackTrace(); } } } public void stop() { stop = true; } private class RecieveTask implements Runnable { public void run() { logger.info("Recieved task begin in sync mode..."); while (!stop) { try { Message msg = consumer.receive(); handleRecievedMessage(msg); session.commit(); Thread.sleep(interval); } catch (Exception e) { e.printStackTrace(); break; } } logger.info("Recieve task finished..."); } } public static void main(String[] args) throws JMSException { Reciever reciever = new Reciever(Constants.URL, Constants.USER, Constants.PASSWD, Constants.DEFAULT_QUEUE); reciever.setMode(RECIEVE_MODE_ASYNC); reciever.start(); } }
测试结果:
1.Reciever在ASYNC模式,注意看日志时间,可以看到Sender一旦了消息,Reciever就会接受到消息(忽略网络)
2.Reciever在ASYNC模式下,Sender每秒发一个消息,而接收者每3秒接收一个消息:可以看到Reciever接受的时间是和Sender发送的时间是无联系的。
相关推荐
SpringActiveMQ入门示例是关于如何在Java环境中利用Spring框架与Apache ActiveMQ集成的一个实践教程。这个示例主要适用于开发者想要了解如何在Spring应用中使用消息队列进行异步通信和解耦。在这个项目中,开发环境...
**ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统...
**ActiveMQ入门示例** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。在这个入门示例中,我们将探讨如何使用ActiveMQ实现点对点(Point-to-Point)的...
根据提供的文件信息:“activeMQ入门到精通”,我们可以深入探讨ActiveMQ的相关知识点,包括其基本概念、安装配置步骤、核心功能特性以及应用场景等。 ### ActiveMQ简介 ActiveMQ是一款开源的消息中间件,它支持...
本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式。 首先,我们需要了解ActiveMQ的基本概念。在消息队列中,生产者是发送消息的实体,而消费者则是接收和处理这些消息的实体。...
在压缩包"ActiveMQ-5.1"中,可能包含了示例代码和配置文件,你可以根据这些资料动手实践,通过运行例子来加深对ActiveMQ的理解。这些例子涵盖了基本的发送和接收消息,以及一些高级特性,如消息选择器、事务管理等。...
**JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...
总之,这个“activeMQ消息中间件入门示例”是学习如何使用ActiveMQ实现消息传递的一个良好起点。通过理解生产者和消费者的交互方式,以及如何配置和连接到ActiveMQ服务器,开发者可以进一步探索和利用ActiveMQ在...
在“HETF-ActiveMQ入门手册.doc”中,读者可能会找到如何安装和配置ActiveMQ,创建和管理队列与主题,设置安全策略,以及如何在实际项目中集成和使用ActiveMQ的详细步骤和示例。此外,文档可能还会涵盖性能调优技巧...
### ActiveMQ 入门知识点详解 #### 一、ActiveMQ 概述 **ActiveMQ** 是由 Apache 软件基金会开发的一款免费且开源的消息中间件。与重量级且需付费的 IBM MQ 相比,ActiveMQ 更适合初学者及预算有限的项目使用。 *...
在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单的Master环境。 首先,我们要了解消息队列(Message Queue)的基本概念。消息队列是一种异步通信机制,它允许应用程序之间...
《ActiveMQ入门实例详解》 在信息技术领域,消息队列(Message Queue)作为一种重要的中间件技术,被广泛应用于系统解耦、异步处理以及负载均衡等场景。Apache ActiveMQ是Apache软件基金会开发的一款开源消息代理,...
描述中的"memcached 和 activeMQ 的入门级示例代码,JAVA eclipse工程"告诉我们这个项目是为初学者设计的,它包含了在Eclipse开发环境中运行的Java代码。Eclipse是一款广泛使用的Java集成开发环境(IDE),使得...
以下是一个简单的示例: ```java package com.abc.demo.activemq.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org....
本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...
005-集群部署1;006-集群部署2;007-集群部署3;activemq集群配置文档.pdf;ActiveMQ(中文)参考手册.doc;ActiveMQ集群:网络连接模式(network connector)详解.docx;ActiveMQ集群:网络连接模式(network connector)...
**ActiveMQ入门详解** ActiveMQ是Apache组织开发的一款开源的消息中间件,它是Java Message Service (JMS) 的实现,主要用于处理应用间的异步通信。在分布式系统中,ActiveMQ作为一个消息代理,允许应用程序通过...