本文主要展示如何使用activeMQ进行程序设计,可以作为代码实例参考;此后会继续补充有关JMS 或ActiveMQ的优化和架构部分。
本实例主要展示如何使用Queue。
一.pom.xml
<dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>3.2.3.RELEASE</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>3.2.3.RELEASE</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>javax.jms</groupId> <artifactId>jms</artifactId> <version>1.1</version> </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>proton-jms</artifactId> <version>0.3</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.8.0</version> </dependency> </dependencies> <build> <finalName>test-jms-1.0</finalName> <resources> <resource> <directory>src/main/resources</directory> <filtering>true</filtering> </resource> </resources> </build>其中“proton-jms”是需要声明的,否则可能无法正常运行。
二.Java实例(非spring环境)
对JMS的程序部分,推荐使用JNDI + 异步listener方式;所以接下来的例子将采取此方式。
1) jndi.properties----[/src/main/resources/jndi.properties]
###contextFactory java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory ###brokerUrl,any protocol java.naming.provider.url = tcp://localhost:61616 ##username ##java.naming.security.principal= ##password ##java.naming.security.credentials= ##connectionFactory,for building sessions connectionFactoryNames = QueueCF,TopicCF ##topic.<topicName> = <physicalName-of-topic> ##your application should use <topicName>,such as: ## context.lookup("topic1"); ##It can be more than once topic.topic1 = jms.topic1 ##queue.<topicName> = <physicalName-of-queue> queue.queue1 = jms.queue1
public class QueueMessageListener implements MessageListener{ public void onMessage(Message message) { if(message == null){ return; } try{ if(message instanceof TextMessage){ String text = ((TextMessage)message).getText(); System.out.println("-----JMS Message header-----"); //message的关联id,可以在发送消息时指定,用于描述消息的关联性 System.out.println("CorrelationID :" + message.getJMSCorrelationID()); //消息的“传送模式”,1:非持久,2:持久 System.out.println("DeliveryMode :" + message.getJMSDeliveryMode()); //消息的过期时间,毫秒数;如果在发送消息时指定了timeToLive,此值为timestap + timeToLive System.out.println("Expiration :" + message.getJMSExpiration()); //消息ID,全局唯一 System.out.println("MessageID :" + message.getJMSMessageID()); //消息权重,参考属性 System.out.println("Priority :" + message.getJMSPriority()); //是否为“重发”;当一个消息发送给消费者之后,未能收到“确认”,将会导致消息重发 System.out.println("Redelivered :" +message.getJMSRedelivered()); //消息创建的时间戳,当消息发送时被赋值。 System.out.println("Timestamp :" + message.getJMSTimestamp()); //消息的类型 System.out.println("Type :" + message.getJMSType()); System.out.println("-----Message Properties-----"); Enumeration<String> names = message.getPropertyNames(); if(names != null){ while(names.hasMoreElements()){ String key = names.nextElement(); System.out.println(key + ":" + message.getStringProperty(key)); } } System.out.println(">>>>" + text); } }catch(Exception e){ e.printStackTrace(); } } }
因为我们已经使用了“MessageListener”来异步接受消息,事实上JMS的实现中已经开启了单独的线程用来从网络中接受消息,并逐次调用onMessage方法;此处我们没有必要再次额外的创建线程。
public class SimpleConsumer { private Connection connection; private Session session; private MessageConsumer consumer; private boolean isStarted; public SimpleConsumer(MessageListener listener) throws Exception{ Context context = new InitialContext(); ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = (Queue)context.lookup("queue1"); consumer = session.createConsumer(queue); consumer.setMessageListener(listener); } public synchronized boolean start(){ if(isStarted){ return true; } try{ connection.start();//very important isStarted = true; return true; }catch(Exception e){ return false; } } public synchronized void close(){ isStarted = false; try{ session.close(); connection.close(); }catch(Exception e){ // } } }
public class SimpleProductor { private MessageProducer producer; private Session session; private Connection connection; private boolean isOpen = true; public SimpleProductor() throws Exception{ Context context = new InitialContext(); ConnectionFactory connectionFactory = (QueueConnectionFactory)context.lookup("QueueCF"); connection = connectionFactory.createConnection(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = (Queue)context.lookup("queue1"); producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); } public boolean send(String message) { if(!isOpen){ throw new RuntimeException("session has been closed!"); } try{ producer.send(session.createTextMessage(message)); return true; }catch(Exception e){ return false; } } public synchronized void close(){ try{ if(isOpen){ isOpen = false; } session.close(); connection.close(); }catch (Exception e) { // } } }
上面的程序,基本上可以完成简单的消息发送和接受,此外,还有一种不常用的方式:
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination queue = session.createQueue(QUEUE); MessageProducer producer = session.createProducer(queue); //MessageConsumer consumer = session.createConsumer(queue)
//同步接收消息 MessageConsumer consumer = session.createConsumer(queue); connection.start(); while(true){ Message message = consumer.receive(10000); if(message == null){ continue; } if(message instanceof TextMessage){ //... } }
5) 测试方法:
SimpleProductor productor = new SimpleProductor(); productor.start(); for(int i=0; i<10; i++){ productor.send("message content:" + i); } productor.close(); SimpleConsumer consumer = new SimpleConsumer(new QueueMessageListener()); consumer.start(); //consumer.close();
三.Spring-jms实例
1) 配置文件:
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"></property> </bean> <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="queue1"></constructor-arg> </bean> <!-- productor --> <bean id="queueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="defaultDestination" ref="queueDestination"></property> <!-- 是否在message中开启timestamp属性 --> <property name="messageTimestampEnabled" value="true"></property> <!-- 是否开启deliveryMode,priority,timeToLive消息附属属性 ,否则上述3个属性将采用默认值--> <property name="explicitQosEnabled" value="true"></property> <!-- NON_PERSISTENT = 1,PERSISTENT = 2,默认值为2--> <property name="deliveryMode" value="2"></property> <!-- pubSubNoLocal,对于topic而言,还需要注意此选项:是否接受本地消息,当消费者和生产者公用一个connection时 --> </bean> <bean id="productor" class="com.test.jms.spring.impl.ProductorImpl"> <property name="jmsTemplate" ref="queueTemplate"></property> </bean> <!-- MDB --> <bean id="queueMessageListener" class="com.test.jms.object.QueueMessageListener"/> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"></property> <property name="destination" ref="queueDestination"></property> <property name="messageListener" ref="queueMessageListener"></property> <!-- 如果消息的接收速率,大于消息处理的速率时,可以采取线程池方式 <property name="taskExecutor" ref="queueMessageExecutor"></property> --> <!-- --> <property name="concurrentConsumers" value="1"></property> <!-- [concurrentConsumers]-[maxConcurrentConsumers] --> <!-- <property name="concurrency" value="1-5"></property> --> </bean> <bean id="queueMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2" /> <property name="maxPoolSize" value="5" /> <property name="daemon" value="true" /> <property name="keepAliveSeconds" value="120" /> </bean>
我们采用了spring推荐的方式:消息生产者基于jmsTemplate,消息消费者基于MDB(pojo的消息驱动bean);为了提升消息的接收者的吞吐能力,我们可以采取多线程的方式,其中有几个重要的参考配置:
A) taskExecutor:消息接受端使用线程池的方式处理消息;当消息的网络输入速率大于消息的处理速率时,可以考虑采用此方式;在消息消费者的与JMS-Server的网络链接中,每收到一条消息,将会立即交付给线程池中的线程去执行,执行时仍然调用messageListener的方法;此处需要注意,线程池中的所有线程仍然共享一个messageListener实例,在采用线程池模式中,请注意线程安全问题。
B) concurrentConsumers:并发运行的消费者个数,在默认情况下为一个“消息消费者”;事实上,一个consumer即为一个Session,多个consumer即为多个Session;不过它们底层仍然共享一个“tcp链接”;此配置项仍然是适用于“消息的网络输入速率大于消息的处理速率”的场景;每一个consumer都将会在单独的线程中运行,但是它们仍然共享一个messageListener实例;在此场景下,你无法绝对的保证,原本“有序”的消息在交付给多个consumer时被实际执行的顺序也是严格的。
taskExecutor是一种额外的优化策略,concurrentConsumers则是采用了JMS原生的特性;在实际场景中,我们选择一种即可。如下为Spring-JMS是如何使用线程池处理消息的原理(基于封装的思想):
if (this.taskExecutor != null) { consumer.setMessageListener(new MessageListener() { public void onMessage(final Message message) { taskExecutor.execute(new Runnable() { public void run() { processMessage(message, session); } }); } }); } else { consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { processMessage(message, session); } }); }
2) ProductorImpl.java
public class ProductorImpl implements Productor { private JmsTemplate jmsTemplate; public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public void send(final String message) { if(message == null){ return; } jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { return session.createTextMessage(message); } }); } public void send(final Map<String, String> message) { if(message == null || message.isEmpty()){ return; } jmsTemplate.send(new MessageCreator() { public Message createMessage(Session session) throws JMSException { MapMessage mm = session.createMapMessage(); for(Entry<String, String> entry : message.entrySet()){ mm.setString(entry.getKey(), entry.getValue()); } return mm; } }); } }
非常的简单,我们不用书写和consumer有关的代码。一切就结束了。[备注,本文中的实例命名为Productor,只是为了避免和JMS中producer混淆]
相关推荐
同时,理解 MFC 的事件驱动编程模型对于正确处理 ActiveMQ 的消息回调至关重要。 在开发实例中,你可能还会接触到 SSL/TLS 连接配置,以确保通信的安全性。此外,了解如何使用事务来保证消息的一致性也是重要的知识...
### ActiveMQ开发实例核心知识点解析 #### 一、ActiveMQ简介 **ActiveMQ**是由Apache组织维护的一个开源项目,作为一款高性能的消息中间件,它在众多消息传递系统中脱颖而出。ActiveMQ不仅支持JMS 1.1和J2EE 1.4...
Apache ActiveMQ是业界广泛使用的开源消息代理,它遵循Java Message Service (JMS) 规范,提供可靠的消息传递服务,适用于多种编程语言,包括C++和MFC(Microsoft Foundation Classes)环境。在这个开发实例中,我们...
即使这些应用程序可能运行在不同的操作系统或编程语言环境下,如这里的C++与ActiveMQ的Java背景。通过ActiveMQ,C++开发者可以利用MFC库来创建客户端,实现与ActiveMQ服务器的通信。 首先,我们需要理解ActiveMQ的...
4. **JMS编程模型**:在`activemq.doc`文档中,你将了解到如何使用JMS API创建生产者和消费者。生产者负责发送消息,消费者负责接收消息。这通常涉及到创建ConnectionFactory,Session,Destination(Queue或Topic)...
通过文档中的这些内容,我们可以了解到ActiveMQ的基本操作流程,包括安装、配置、编程创建消息队列以及如何编写发送和接收消息的Java代码。ActiveMQ在分布式系统中充当消息传递中介,能够有效地实现异步通信、负载...
**Spring 整合 ActiveMQ 简单实例** 在当今的分布式系统中,消息队列(Message Queue)作为异步处理、解耦组件的关键技术,被广泛应用。Spring 框架与 ActiveMQ 的整合,使得开发者能够轻松地在 Spring 应用程序中...
标签"ActiveMQ C++ MFC"暗示了这个实例着重于C++编程环境下的ActiveMQ使用,特别是与MFC应用的整合。开发者需要熟悉C++多线程编程,理解MFC的消息机制,以及如何在MFC控件中显示和处理ActiveMQ消息。 为了成功实现...
《ActiveMQ入门实例详解》 在信息技术领域,消息队列(Message Queue)作为一种重要的中间件技术,被广泛应用于系统解耦、异步处理以及负载均衡等场景。Apache ActiveMQ是Apache软件基金会开发的一款开源消息代理,...
**JMS与ActiveMQ入门实例详解** Java消息服务(Java Message Service,简称JMS)是Java平台中用于创建、发送、接收和读取消息的应用程序编程接口(API)。它为不同的应用程序提供了一种标准的方式来创建、发送、...
在ActiveMQ中,你可以通过管理界面或编程方式创建一个Topic。例如,创建一个名为"myTopic"的Topic,可以使用Java API如下: ```java ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://...
在IT行业中,Spring框架是Java应用开发的基石,它提供了丰富的功能来简化应用程序的构建,包括依赖注入、AOP(面向切面编程)以及各种模块如数据访问、Web支持等。而ActiveMQ则是Apache软件基金会的一个开源项目,是...
ActiveMQ是Apache软件基金会开发的一款开源消息代理,支持多种协议,包括AMQP、STOMP、MQTT、XMPP等,而且提供了丰富的客户端库,包括Java、C#、Python等多种编程语言。本实例将重点讲解如何在C#环境中使用ActiveMQ...
1. **创建队列**:在ActiveMQ中,你可以通过管理控制台或者编程方式创建队列。例如,通过Java API,我们可以使用`javax.jms.Queue`接口的实现来创建队列,并将其配置到ActiveMQ的Broker中。 2. **生产者代码**:...
ActiveMQ实例主要展示了如何在不同的应用场景中集成并使用Apache ActiveMQ,这是一个功能强大的开源消息代理,它遵循Java消息服务(JMS)规范,提供了消息传递的平台。在这个实例中,我们将深入探讨ActiveMQ如何与...
在IT行业中,Spring框架是Java应用开发中的一个关键组件,它提供了一个全面的编程和配置模型,用于构建灵活、可维护的...这个完整的实例是一个很好的学习资源,可以帮助开发者更好地理解和实践Spring与ActiveMQ的整合。
标题中的“ActiveMQ与Spring集成实例之使用Maven构建”是指在Java开发环境中,通过Maven构建工具将Apache ActiveMQ消息中间件与Spring框架整合在一起的实际操作案例。这个主题涵盖了几大关键知识点: 1. **Apache ...
1. 支持多种编程语言和应用协议,如OpenWire、STOMP、REST、XMPP、AMQP等。 2. 全面符合JMS 1.1和J2EE 1.4规范,支持持久化、XA消息和事务处理。 3. 与Spring框架紧密集成,方便内嵌到Spring应用中。 4. 可在常见的...
除了Java之外,ActiveMQ 还提供了对C、C++、AJAX、ActionScript等多种编程语言的支持,这意味着开发者可以使用不同的语言来构建客户端应用程序,提高了跨平台的兼容性和互操作性。 4. **支持多种协议** 支持...