`
QING____
  • 浏览: 2250700 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ编程实例

    博客分类:
  • JMS
 
阅读更多

    本文主要展示如何使用activeMQ进行程序设计,可以作为代码实例参考;此后会继续补充有关JMS 或ActiveMQ的优化和架构部分。

    本实例主要展示如何使用Queue。

 

一.pom.xml

 

<dependencies>
    	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-context</artifactId>
		<version>3.2.3.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>org.springframework</groupId>
		<artifactId>spring-jms</artifactId>
		<version>3.2.3.RELEASE</version>
	</dependency>
	<dependency>
		<groupId>commons-lang</groupId>
		<artifactId>commons-lang</artifactId>
		<version>2.4</version>
	</dependency>
	<dependency>
		<groupId>javax.jms</groupId>
		<artifactId>jms</artifactId>
		<version>1.1</version>
	</dependency>
	<dependency>
		<groupId>org.apache.qpid</groupId>
		<artifactId>proton-jms</artifactId>
		<version>0.3</version>
	</dependency>
           
	<dependency>
		<groupId>org.apache.activemq</groupId>
		<artifactId>activemq-all</artifactId>
		<version>5.8.0</version>
	</dependency>
</dependencies>
<build>
	<finalName>test-jms-1.0</finalName>
	<resources>
	    <resource>
		<directory>src/main/resources</directory>
		<filtering>true</filtering>
	    </resource>
	</resources>
</build>
    其中“proton-jms”是需要声明的,否则可能无法正常运行。

 

 

二.Java实例(非spring环境)

    对JMS的程序部分,推荐使用JNDI + 异步listener方式;所以接下来的例子将采取此方式。

    1) jndi.properties----[/src/main/resources/jndi.properties]

 

###contextFactory
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
###brokerUrl,any protocol
java.naming.provider.url = tcp://localhost:61616
##username
##java.naming.security.principal=
##password
##java.naming.security.credentials=
##connectionFactory,for building sessions
connectionFactoryNames = QueueCF,TopicCF
##topic.<topicName> = <physicalName-of-topic>
##your application should use <topicName>,such as:
## context.lookup("topic1");
##It can be more than once
topic.topic1 = jms.topic1
##queue.<topicName> = <physicalName-of-queue>
queue.queue1 = jms.queue1
   
    2) QueueMessageListener.java(消息异步监听器)

 

public class QueueMessageListener implements MessageListener{

	public void onMessage(Message message) {
		if(message == null){
			return;
		}
		try{
			if(message instanceof TextMessage){
				String text = ((TextMessage)message).getText();
				System.out.println("-----JMS Message header-----");
				//message的关联id,可以在发送消息时指定,用于描述消息的关联性
				System.out.println("CorrelationID :" + message.getJMSCorrelationID());
				//消息的“传送模式”,1:非持久,2:持久
				System.out.println("DeliveryMode :" + message.getJMSDeliveryMode());
				//消息的过期时间,毫秒数;如果在发送消息时指定了timeToLive,此值为timestap + timeToLive
				System.out.println("Expiration :" + message.getJMSExpiration());
				//消息ID,全局唯一
				System.out.println("MessageID :" + message.getJMSMessageID());
				//消息权重,参考属性
				System.out.println("Priority :" + message.getJMSPriority());
				//是否为“重发”;当一个消息发送给消费者之后,未能收到“确认”,将会导致消息重发
				System.out.println("Redelivered :" +message.getJMSRedelivered());
				//消息创建的时间戳,当消息发送时被赋值。
				System.out.println("Timestamp :" + message.getJMSTimestamp());
				//消息的类型
				System.out.println("Type :" + message.getJMSType());
				System.out.println("-----Message Properties-----");
				Enumeration<String> names = message.getPropertyNames();
				if(names != null){
					while(names.hasMoreElements()){
						String key = names.nextElement();
						System.out.println(key + ":" + message.getStringProperty(key));
					}
				}
				System.out.println(">>>>" + text);
			}
		}catch(Exception e){
			e.printStackTrace();
		}
	}
}
  
    3) SimpleConsumer.java(消息消费者)

     因为我们已经使用了“MessageListener”来异步接受消息,事实上JMS的实现中已经开启了单独的线程用来从网络中接受消息,并逐次调用onMessage方法;此处我们没有必要再次额外的创建线程。

 

public class SimpleConsumer {

	private Connection connection;
	private Session session;
	private MessageConsumer consumer;
	
	private boolean isStarted;
	
	public SimpleConsumer(MessageListener listener) throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		consumer = session.createConsumer(queue);
		consumer.setMessageListener(listener);
	}
	
	
	public synchronized boolean start(){
		if(isStarted){
			return true;
		}
		try{
			connection.start();//very important
			isStarted = true;
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		isStarted = false;
		try{
			session.close();
			connection.close();
		}catch(Exception e){
			//
		}
	}
}
  
    4) SimpleProductor.java(消息生产者)

 

public class SimpleProductor {

	private MessageProducer producer;
	private Session session;
	private Connection connection;
	private boolean isOpen = true;
	
	public SimpleProductor() throws Exception{
		Context context = new InitialContext();
		ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF");
		connection = connectionFactory.createConnection();
		session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		Destination queue = (Queue)context.lookup("queue1");
		producer = session.createProducer(queue);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
		
	}
	
	
	public boolean send(String message) {
		if(!isOpen){
			throw new RuntimeException("session has been closed!");
		}
		try{
			producer.send(session.createTextMessage(message));
			return true;
		}catch(Exception e){
			return false;
		}
	}
	
	public synchronized void close(){
		try{
			if(isOpen){
				isOpen = false;
			}
			session.close();
			connection.close();
		}catch (Exception e) {
			//
		}
	}
	
}

 

    上面的程序,基本上可以完成简单的消息发送和接受,此外,还有一种不常用的方式:

ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(QUEUE);
MessageProducer producer = session.createProducer(queue);
//MessageConsumer consumer = session.createConsumer(queue)
//同步接收消息
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
while(true){
	Message message = consumer.receive(10000);
	if(message == null){
		continue;
	}
	if(message instanceof TextMessage){
		//...
	}
}

 

    5) 测试方法:

SimpleProductor productor = new SimpleProductor();
productor.start();
for(int i=0; i<10; i++){
	productor.send("message content:" + i);
}
productor.close();
SimpleConsumer consumer = new SimpleConsumer(new QueueMessageListener());
consumer.start();

//consumer.close();

 

三.Spring-jms实例

 

    1) 配置文件:

<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
	<property name="brokerURL" value="tcp://localhost:61616"></property>
</bean>
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
	<constructor-arg index="0" value="queue1"></constructor-arg>
</bean>
<!-- productor -->
<bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate">
	<property name="connectionFactory" ref="connectionFactory"></property>
	<property name="defaultDestination" ref="queueDestination"></property>
	<!-- 是否在message中开启timestamp属性 -->
	<property name="messageTimestampEnabled" value="true"></property>
	<!-- 是否开启deliveryMode,priority,timeToLive消息附属属性 ,否则上述3个属性将采用默认值-->
	<property name="explicitQosEnabled" value="true"></property>
	<!-- NON_PERSISTENT = 1,PERSISTENT = 2,默认值为2-->
	<property name="deliveryMode" value="2"></property>
	<!-- pubSubNoLocal,对于topic而言,还需要注意此选项:是否接受本地消息,当消费者和生产者公用一个connection时 -->
</bean>
<bean id="productor" class="com.test.jms.spring.impl.ProductorImpl">
	<property name="jmsTemplate" ref="queueTemplate"></property>
</bean>
<!-- MDB -->
<bean id="queueMessageListener" class="com.test.jms.object.QueueMessageListener"/>
<bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"></property>
	<property name="destination" ref="queueDestination"></property>
	<property name="messageListener" ref="queueMessageListener"></property>
	<!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 
	<property name="taskExecutor" ref="queueMessageExecutor"></property>
	-->
	 <!--  -->
	<property name="concurrentConsumers" value="1"></property>
	<!-- [concurrentConsumers]-[maxConcurrentConsumers] -->
	<!--  
	<property name="concurrency" value="1-5"></property>
	-->
	
</bean>
<bean id="queueMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
	<property name="corePoolSize" value="2" />
	<property name="maxPoolSize" value="5" />
	<property name="daemon" value="true" />
	<property name="keepAliveSeconds" value="120" />
</bean>

    我们采用了spring推荐的方式:消息生产者基于jmsTemplate,消息消费者基于MDB(pojo的消息驱动bean);为了提升消息的接收者的吞吐能力,我们可以采取多线程的方式,其中有几个重要的参考配置:

    A) taskExecutor:消息接受端使用线程池的方式处理消息;当消息的网络输入速率大于消息的处理速率时,可以考虑采用此方式;在消息消费者的与JMS-Server的网络链接中,每收到一条消息,将会立即交付给线程池中的线程去执行,执行时仍然调用messageListener的方法;此处需要注意,线程池中的所有线程仍然共享一个messageListener实例,在采用线程池模式中,请注意线程安全问题。

    B) concurrentConsumers:并发运行的消费者个数,在默认情况下为一个“消息消费者”;事实上,一个consumer即为一个Session,多个consumer即为多个Session;不过它们底层仍然共享一个“tcp链接”;此配置项仍然是适用于“消息的网络输入速率大于消息的处理速率”的场景;每一个consumer都将会在单独的线程中运行,但是它们仍然共享一个messageListener实例;在此场景下,你无法绝对的保证,原本“有序”的消息在交付给多个consumer时被实际执行的顺序也是严格的。

    taskExecutor是一种额外的优化策略,concurrentConsumers则是采用了JMS原生的特性;在实际场景中,我们选择一种即可。如下为Spring-JMS是如何使用线程池处理消息的原理(基于封装的思想):

if (this.taskExecutor != null) {
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(final Message message) {
			taskExecutor.execute(new Runnable() {
				public void run() {
					processMessage(message, session);
				}
			});
		}
	});
}
else {
	consumer.setMessageListener(new MessageListener() {
		public void onMessage(Message message) {
			processMessage(message, session);
		}
	});
}

 

    2) ProductorImpl.java

public class ProductorImpl implements Productor {

	private JmsTemplate jmsTemplate;
	
	public void setJmsTemplate(JmsTemplate jmsTemplate) {
		this.jmsTemplate = jmsTemplate;
	}

	public void send(final String message) {
		if(message == null){
			return;
		}
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(message);
			}
		});
	}

	public void send(final Map<String, String> message) {
		if(message == null || message.isEmpty()){
			return;
		}
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				MapMessage mm = session.createMapMessage();
				for(Entry<String, String> entry : message.entrySet()){
					mm.setString(entry.getKey(), entry.getValue());
				}
				return mm;
			}
		});
		
	}

	
}

    非常的简单,我们不用书写和consumer有关的代码。一切就结束了。[备注,本文中的实例命名为Productor,只是为了避免和JMS中producer混淆]

分享到:
评论

相关推荐

    ActiveMQ开发实例-1

    同时,理解 MFC 的事件驱动编程模型对于正确处理 ActiveMQ 的消息回调至关重要。 在开发实例中,你可能还会接触到 SSL/TLS 连接配置,以确保通信的安全性。此外,了解如何使用事务来保证消息的一致性也是重要的知识...

    ActiveMQ开发实例-6

    ### ActiveMQ开发实例核心知识点解析 #### 一、ActiveMQ简介 **ActiveMQ**是由Apache组织维护的一个开源项目,作为一款高性能的消息中间件,它在众多消息传递系统中脱颖而出。ActiveMQ不仅支持JMS 1.1和J2EE 1.4...

    ActiveMQ开发实例-5

    Apache ActiveMQ是业界广泛使用的开源消息代理,它遵循Java Message Service (JMS) 规范,提供可靠的消息传递服务,适用于多种编程语言,包括C++和MFC(Microsoft Foundation Classes)环境。在这个开发实例中,我们...

    ActiveMQ开发实例-2

    即使这些应用程序可能运行在不同的操作系统或编程语言环境下,如这里的C++与ActiveMQ的Java背景。通过ActiveMQ,C++开发者可以利用MFC库来创建客户端,实现与ActiveMQ服务器的通信。 首先,我们需要理解ActiveMQ的...

    activemq入门实例,有源代码

    4. **JMS编程模型**:在`activemq.doc`文档中,你将了解到如何使用JMS API创建生产者和消费者。生产者负责发送消息,消费者负责接收消息。这通常涉及到创建ConnectionFactory,Session,Destination(Queue或Topic)...

    Activemq入门实例.pdf

    通过文档中的这些内容,我们可以了解到ActiveMQ的基本操作流程,包括安装、配置、编程创建消息队列以及如何编写发送和接收消息的Java代码。ActiveMQ在分布式系统中充当消息传递中介,能够有效地实现异步通信、负载...

    Spring整合ActiveMQ简单实例

    **Spring 整合 ActiveMQ 简单实例** 在当今的分布式系统中,消息队列(Message Queue)作为异步处理、解耦组件的关键技术,被广泛应用。Spring 框架与 ActiveMQ 的整合,使得开发者能够轻松地在 Spring 应用程序中...

    ActiveMQ开发实例-4

    标签"ActiveMQ C++ MFC"暗示了这个实例着重于C++编程环境下的ActiveMQ使用,特别是与MFC应用的整合。开发者需要熟悉C++多线程编程,理解MFC的消息机制,以及如何在MFC控件中显示和处理ActiveMQ消息。 为了成功实现...

    消息队列-activemq入门实例.zip

    《ActiveMQ入门实例详解》 在信息技术领域,消息队列(Message Queue)作为一种重要的中间件技术,被广泛应用于系统解耦、异步处理以及负载均衡等场景。Apache ActiveMQ是Apache软件基金会开发的一款开源消息代理,...

    JMS-ActiveMQ入门实例

    **JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和读取消息的应用程序编程接口(API)。它为不同的应用程序提供了一种标准的方式来创建、发送、...

    ActiveMQ Topic 实例

    在ActiveMQ中,你可以通过管理界面或编程方式创建一个Topic。例如,创建一个名为"myTopic"的Topic,可以使用Java API如下: ```java ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://...

    Spring和ActiveMQ的整合实例源码

    在IT行业中,Spring框架是Java应用开发的基石,它提供了丰富的功能来简化应用程序的构建,包括依赖注入、AOP(面向切面编程)以及各种模块如数据访问、Web支持等。而ActiveMQ则是Apache软件基金会的一个开源项目,是...

    C# ActiveMQ 实例

    ActiveMQ是Apache软件基金会开发的一款开源消息代理,支持多种协议,包括AMQP、STOMP、MQTT、XMPP等,而且提供了丰富的客户端库,包括Java、C#、Python等多种编程语言。本实例将重点讲解如何在C#环境中使用ActiveMQ...

    activemq实例

    1. **创建队列**:在ActiveMQ中,你可以通过管理控制台或者编程方式创建队列。例如,通过Java API,我们可以使用`javax.jms.Queue`接口的实现来创建队列,并将其配置到ActiveMQ的Broker中。 2. **生产者代码**:...

    ActiveMQ实例

    ActiveMQ实例主要展示了如何在不同的应用场景中集成并使用Apache ActiveMQ,这是一个功能强大的开源消息代理,它遵循Java消息服务(JMS)规范,提供了消息传递的平台。在这个实例中,我们将深入探讨ActiveMQ如何与...

    Spring和ActiveMQ整合的完整实例

    在IT行业中,Spring框架是Java应用开发中的一个关键组件,它提供了一个全面的编程和配置模型,用于构建灵活、可维护的...这个完整的实例是一个很好的学习资源,可以帮助开发者更好地理解和实践Spring与ActiveMQ的整合。

    ActiveMQ与spring集成实例之使用Maven构建

    标题中的“ActiveMQ与Spring集成实例之使用Maven构建”是指在Java开发环境中,通过Maven构建工具将Apache ActiveMQ消息中间件与Spring框架整合在一起的实际操作案例。这个主题涵盖了几大关键知识点: 1. **Apache ...

    springboot集成activemq的实例代码

    1. 支持多种编程语言和应用协议,如OpenWire、STOMP、REST、XMPP、AMQP等。 2. 全面符合JMS 1.1和J2EE 1.4规范,支持持久化、XA消息和事务处理。 3. 与Spring框架紧密集成,方便内嵌到Spring应用中。 4. 可在常见的...

    ActiveMq-JMS好用实例详解

    除了Java之外,ActiveMQ 还提供了对C、C++、AJAX、ActionScript等多种编程语言的支持,这意味着开发者可以使用不同的语言来构建客户端应用程序,提高了跨平台的兼容性和互操作性。 4. **支持多种协议** 支持...

Global site tag (gtag.js) - Google Analytics