`

ActiveMQ入门示例一

    博客分类:
  • JMS
 
阅读更多

ActiveMQ入门示例

 

ActiveMQ有两种模式,点对点和发布/订阅模式,点对点中消息只能被一个消费者消费,而发布订阅中,消息可以被一群消费者消费,很好理解。下面的例子是点对点的

 

   安装ActiveMQ很简单就不说了,客户端使用API只需添加以下依赖:

<dependency>
	<groupId>org.apache.activemq</groupId>
	<artifactId>activemq-all</artifactId>
	<version>5.9.1</version>
</dependency>
<dependency>
	<groupId>log4j</groupId>
	<artifactId>log4j</artifactId>
	<version>1.2.17</version>
</dependency>

 

 

代码:

Sender.java:

package cc.lixiaohui.test.jms.activemq.p2p;

import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

import cc.lixiaohui.test.jms.activemq.Constants;

public class Sender {
	//发送间隔
	private long interval = 1 * 1000;
	
	private ConnectionFactory factory;

	private Connection conn;
	
	private Destination dest;
	
	private Session session;

	private MessageProducer producer;
	// 单线程池负责发送
	private ExecutorService worker = Executors.newSingleThreadExecutor();

	private volatile boolean stop = false;
	
	private static final Logger logger = Logger.getLogger(Sender.class);
	
	public Sender(String brokenURL, String user, String passwd,
			String queueName) throws JMSException {
		factory = new ActiveMQConnectionFactory(user, passwd, brokenURL);
		conn = factory.createConnection();
		conn.start();
		session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
		dest = session.createQueue(queueName);
		producer = session.createProducer(dest);
	}
	
	public void setInterval(long l) {
		interval = l;
	}

	public void start() {
		worker.submit(new SendTask());
	}

	public synchronized void stop() {
		stop = true;
		worker.shutdown();
	}

	private TextMessage randomMsg() {
		String uuid = UUID.randomUUID().toString();
		TextMessage msg = null;
		try {
			msg = session.createTextMessage(uuid);//把uuid作为消息
			msg.setJMSCorrelationID(uuid);
		} catch (JMSException e) {
			e.printStackTrace();
		}
	
		return msg;
	}

	private class SendTask implements Runnable {

		public void run() {
			logger.info("Send task begin...");
			while (!stop) {
				try {
					// 发送
					TextMessage msg = randomMsg();
					producer.send(msg);
					session.commit(); // commit后消息才会发送到服务端
					logger.info("Send text message : " + msg.getText());
					// 间隔
					Thread.sleep(interval);
				} catch (Exception e) {
					e.printStackTrace();
					break;
				}
			}
			
			logger.info("Send task finished...");
		}
	}
	
	public static void main(String[] args) throws JMSException {
		Sender sender = new Sender(Constants.URL, Constants.USER, Constants.PASSWD, Constants.DEFAULT_QUEUE);
		sender.start();
	}
}

 

 

Reciever.java:

package cc.lixiaohui.test.jms.activemq.p2p;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

import cc.lixiaohui.test.jms.activemq.Constants;

public class Reciever {
	public static final int RECIEVE_MODE_SYNC = 0;
	public static final int RECIEVE_MODE_ASYNC = 1;

	private ConnectionFactory factory;

	private Connection conn;

	private Destination dest;

	private Session session;

	private MessageConsumer consumer;

	private ExecutorService worker = Executors.newSingleThreadExecutor();

	private volatile boolean stop = false;

	private long interval = 3 * 1000;

	private static final Logger logger = Logger.getLogger(Reciever.class);

	// 同步/异步接收模式,默认同步
	private int mode = RECIEVE_MODE_SYNC;

	public Reciever(String brokenURL, String user, String passwd,
			String queueName) throws JMSException {
		factory = new ActiveMQConnectionFactory(user, passwd, brokenURL);
		conn = factory.createConnection();
		conn.start();
		session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
		dest = session.createQueue(queueName);
		consumer = session.createConsumer(dest);
	}

	public void setInterval(long l) {
		interval = l;
	}

	public void setMode(int mode) {
		this.mode = mode;
	}

	public void start() throws JMSException {
		
		if (mode == RECIEVE_MODE_ASYNC) {// 异步
			logger.info("Recieved task begin in async mode...");
			// 由activemq组件回调
			consumer.setMessageListener(new MessageListener() {

				public void onMessage(Message message) {
					handleRecievedMessage(message);
					try {
						session.commit();
					} catch (JMSException e) {
						e.printStackTrace();
						stop = true;
					}
				}

			});
		} else if (mode == RECIEVE_MODE_SYNC) {// 同步, 由于另起线程, 这里也不阻塞
			worker.submit(new RecieveTask());
		}
	}

	private void handleRecievedMessage(Message recievedMsg) {
		if (recievedMsg instanceof TextMessage) {
			TextMessage msg = (TextMessage) recievedMsg;
			try {
				logger.info("Recieved message : " + msg.getText());
			} catch (JMSException e) {
				e.printStackTrace();
			}
		}
	}

	public void stop() {
		stop = true;
	}

	private class RecieveTask implements Runnable {

		public void run() {
			logger.info("Recieved task begin in sync mode...");
			while (!stop) {
				try {
					Message msg = consumer.receive();
					handleRecievedMessage(msg);
					session.commit();
					Thread.sleep(interval);
				} catch (Exception e) {
					e.printStackTrace();
					break;
				}
			}
			logger.info("Recieve task finished...");
		}

	}

	public static void main(String[] args) throws JMSException {
		Reciever reciever = new Reciever(Constants.URL, Constants.USER,
				Constants.PASSWD, Constants.DEFAULT_QUEUE);
		reciever.setMode(RECIEVE_MODE_ASYNC);
		reciever.start();
	}

}

 

 

测试结果:

1.Reciever在ASYNC模式,注意看日志时间,可以看到Sender一旦了消息,Reciever就会接受到消息(忽略网络)



 



 

2.Reciever在ASYNC模式下,Sender每秒发一个消息,而接收者每3秒接收一个消息:可以看到Reciever接受的时间是和Sender发送的时间是无联系的。



 

 

 

  • 大小: 31.9 KB
  • 大小: 32.3 KB
  • 大小: 38.5 KB
  • 大小: 37.8 KB
分享到:
评论

相关推荐

    SpringActiveMQ入门示例

    SpringActiveMQ入门示例是关于如何在Java环境中利用Spring框架与Apache ActiveMQ集成的一个实践教程。这个示例主要适用于开发者想要了解如何在Spring应用中使用消息队列进行异步通信和解耦。在这个项目中,开发环境...

    activemq 入门示例代码

    **ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它遵循 JMS(Java Message Service)规范,支持多种协议,如 AMQP、STOMP、OpenWire 等,广泛应用于分布式系统...

    ActiveMQ入门示例

    **ActiveMQ入门示例** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。在这个入门示例中,我们将探讨如何使用ActiveMQ实现点对点(Point-to-Point)的...

    activeMQ入门到精通.txt

    根据提供的文件信息:“activeMQ入门到精通”,我们可以深入探讨ActiveMQ的相关知识点,包括其基本概念、安装配置步骤、核心功能特性以及应用场景等。 ### ActiveMQ简介 ActiveMQ是一款开源的消息中间件,它支持...

    activeMQ简单入门案例

    本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式。 首先,我们需要了解ActiveMQ的基本概念。在消息队列中,生产者是发送消息的实体,而消费者则是接收和处理这些消息的实体。...

    ActiveMQ入门及深入使用的例子

    在压缩包"ActiveMQ-5.1"中,可能包含了示例代码和配置文件,你可以根据这些资料动手实践,通过运行例子来加深对ActiveMQ的理解。这些例子涵盖了基本的发送和接收消息,以及一些高级特性,如消息选择器、事务管理等。...

    JMS-ActiveMQ入门实例

    **JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和阅读消息的应用程序接口。它为应用程序提供了标准的接口,可以跨越多种消息中间件产品进行通信。...

    activeMQ消息中间件入门示例,行行注释

    总之,这个“activeMQ消息中间件入门示例”是学习如何使用ActiveMQ实现消息传递的一个良好起点。通过理解生产者和消费者的交互方式,以及如何配置和连接到ActiveMQ服务器,开发者可以进一步探索和利用ActiveMQ在...

    HETF-ActiveMQ入门手册.zip

    在“HETF-ActiveMQ入门手册.doc”中,读者可能会找到如何安装和配置ActiveMQ,创建和管理队列与主题,设置安全策略,以及如何在实际项目中集成和使用ActiveMQ的详细步骤和示例。此外,文档可能还会涵盖性能调优技巧...

    Apache ActiveMQ 入门最简单例子

    在本文中,我们将深入探讨如何通过Apache ActiveMQ 5.8版本进行入门,以及如何构建一个简单的Master环境。 首先,我们要了解消息队列(Message Queue)的基本概念。消息队列是一种异步通信机制,它允许应用程序之间...

    消息队列-activemq入门实例.zip

    《ActiveMQ入门实例详解》 在信息技术领域,消息队列(Message Queue)作为一种重要的中间件技术,被广泛应用于系统解耦、异步处理以及负载均衡等场景。Apache ActiveMQ是Apache软件基金会开发的一款开源消息代理,...

    memcached和activeMQ的JAVA示例代码

    描述中的"memcached 和 activeMQ 的入门级示例代码,JAVA eclipse工程"告诉我们这个项目是为初学者设计的,它包含了在Eclipse开发环境中运行的Java代码。Eclipse是一款广泛使用的Java集成开发环境(IDE),使得...

    ActiveMQ 入门实战(3)--SpringBoot 整合 ActiveMQ(csdn)————程序.pdf

    以下是一个简单的示例: ```java package com.abc.demo.activemq.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org....

    activeMQ示例 activeMQ demo,java分布式技术

    本教程旨在帮助activeMQ初学者入门,通过本示例,能完全理解activeMQ的基本概念,为分布式应用打下基础。 本示例中,使用maven管理,完美解决各种依赖问题,不需要自行配置,导入项目等待eclipse自行下载jar包后即可...

    消息队列中间件ActiveMQ入门到精通视频教程及资料

    005-集群部署1;006-集群部署2;007-集群部署3;activemq集群配置文档.pdf;ActiveMQ(中文)参考手册.doc;ActiveMQ集群:网络连接模式(network connector)详解.docx;ActiveMQ集群:网络连接模式(network connector)...

    ActiveMQ的入门例子

    **ActiveMQ入门详解** ActiveMQ是Apache组织开发的一款开源的消息中间件,它是Java Message Service (JMS) 的实现,主要用于处理应用间的异步通信。在分布式系统中,ActiveMQ作为一个消息代理,允许应用程序通过...

Global site tag (gtag.js) - Google Analytics