- 浏览: 124628 次
- 性别:
- 来自: 北京
最新评论
-
javatozhang:
写的不错嘛
activemq+spring 持久化发送消息 -
sunaijia:
难得的好文章
activemq+spring 持久化发送消息
最近的一个项目里,访问压力过大,需要异步处理一些不需要即使处理的请求。
于是考虑用JMS,开始考察了几个jms服务,显示看了openJMS,实验了一下,发现他和spring的结合不是很好,而且只支持JMS 1.02,已经很长时间没有更新了,就不考虑使用了。
后来转到了activemq。
在网上找了好几天资料,最后整理一下,还算比较全面的,大家可以参考一下。
1、先是下载activeMq。
2、由于默认的activemq是日志文件的持久订阅,需要修改activemq的配置文件才能持久化到数据库里,在activemq的conf目录下的activemq.xml,找到
<persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter>
把它注释掉换成
<persistenceAdapter> <jdbcPersistenceAdapter dataSource="#oracle-ds"/> </persistenceAdapter>
还要在activemq.xml的<bean></beans>里面增加数据库的配置,并且把oracle的驱动jar包,复制到activemq下的lib目录里,由于我使用的是oracle我的配置如下:
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/> <property name="url" value="jdbc:oracle:thin:@10.60.30.31:1521:orcl"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
到此你的activemq发送消息就可以持久化了(注:前提是你在spring里配置activemq时要配置消息的持久化)
3、接下来是spring+activemq 使用的jar包,都在activemq目录下的lib里
4、下来是具体一些配置文件和类文件
发送消息类
/** * message sender * @description <p></p> * @author quzishen * @project NormandyPositionII * @class MessageSender.java * @version 1.0 * @time 2011-1-11 */ public class MessageSender { // ~~~ jmsTemplate public JmsTemplate jmsTemplate; /** * send message */ public void sendMessage(){ jmsTemplate.convertAndSend("hello jms!"); } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } }
接收消息类
/** * message receiver * @description <p></p> * @author quzishen * @project NormandyPositionII * @class MessageReceiver.java * @version 1.0 * @time 2011-1-11 */ public class MessageReceiver implements MessageListener { /* (non-Javadoc) * @see javax.jms.MessageListener#onMessage(javax.jms.Message) */ public void onMessage(Message m) { System.out.println("[receive message]"); ObjectMessage om = (ObjectMessage) m; try { String key1 = om.getStringProperty("key1"); System.out.println(key1); System.out.println("model:"+om.getJMSDeliveryMode()); System.out.println("destination:"+om.getJMSDestination()); System.out.println("type:"+om.getJMSType()); System.out.println("messageId:"+om.getJMSMessageID()); System.out.println("time:"+om.getJMSTimestamp()); System.out.println("expiredTime:"+om.getJMSExpiration()); System.out.println("priority:"+om.getJMSPriority()); } catch (JMSException e) { e.printStackTrace(); } } }
在发送消息和接收消息前可以做一些自定的处理,就是这个类
/** * message converter * @description <p></p> * @author quzishen * @project NormandyPositionII * @class MessageConvertForSys.java * @version 1.0 * @time 2011-1-11 */ public class MessageConvertForSys implements MessageConverter { /* (non-Javadoc) * @see org.springframework.jms.support.converter.MessageConverter#toMessage(java.lang.Object, javax.jms.Session) */ public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException { System.out.println("[toMessage]"); ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setJMSExpiration(1000); objectMessage.setStringProperty("key1", object+"_add"); return objectMessage; } /* (non-Javadoc) * @see org.springframework.jms.support.converter.MessageConverter#fromMessage(javax.jms.Message) */ public Object fromMessage(Message message) throws JMSException, MessageConversionException { System.out.println("[fromMessage]"); ObjectMessage objectMessage = (ObjectMessage) message; return objectMessage.getObjectProperty("key1"); } }
第一种,PTP方式的配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd " default-autowire="byName"> <!-- JMS PTP MODEL --> <!-- PTP链接工厂 --> <bean id="queueConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <!-- <property name="brokerURL" value="vm://normandy.notify" /> --> <property name="useAsyncSend" value="true" /> </bean> <!-- 定义消息队列 --> <bean id="dest" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="queueDest" /> </bean> <!-- PTP jms模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="queueConnectionFactory"></property> <property name="defaultDestination" ref="dest" /> <property name="messageConverter" ref="messageConvertForSys" /> <property name="pubSubDomain" value="false" /> </bean> <!-- 消息转换器 --> <bean id="messageConvertForSys" class="com.normandy.tech.test.MessageConvertForSys"></bean> <!-- 消息发送方 --> <bean id="messageSender" class="com.normandy.tech.test.MessageSender"></bean> <!-- 消息接收方 --> <bean id="messageReceiver" class="com.normandy.tech.test.MessageReceiver"></bean> <!-- 消息监听容器 --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="queueConnectionFactory" /> <property name="destination" ref="dest" /> <property name="messageListener" ref="messageReceiver" /> </bean> </beans>
第二种:PUB/SUB方式的配置
我们配置两个消息订阅者,分别订阅不同的消息,这样用于判断是否成功执行了消息的发布和消息的订阅
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd " default-autowire="byName"> <!-- JMS TOPIC MODEL --> <!-- TOPIC链接工厂 --> <bean id="topicSendConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> <property name="useAsyncSend" value="true" /> </bean> <bean id="topicListenConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://127.0.0.1:61616" /> </bean> <!-- 定义主题 --> <bean id="myTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="normandy.topic"/> </bean> <bean id="myTopic2" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="normandy.topic2"/> </bean> <!-- 消息转换器 --> <bean id="messageConvertForSys" class="com.normandy.tech.test.MessageConvertForSys"></bean> <!-- TOPIC send jms模板 --> <bean id="topicSendJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="topicSendConnectionFactory"></property> <property name="defaultDestination" ref="myTopic" /> <property name="messageConverter" ref="messageConvertForSys" /> <!-- 开启订阅模式 --> <property name="pubSubDomain" value="true"/> </bean> <!-- 消息发送方 --> <bean id="topicMessageSender" class="com.normandy.tech.test.MessageSender"> <property name="jmsTemplate" ref="topicSendJmsTemplate"></property> </bean> <!-- 消息接收方 --> <bean id="topicMessageReceiver" class="com.normandy.tech.test.MessageReceiver"> </bean> <!-- 主题消息监听容器 --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="topicListenConnectionFactory" /> <property name="pubSubDomain" value="true"/><!-- default is false --> <property name="destination" ref="myTopic" /> <!-- listen topic: myTopic --> <property name="subscriptionDurable" value="true"/> <property name="clientId" value="clientId_001"/><!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉--> <property name="messageListener" ref="topicMessageReceiver" /> </bean> <!-- 主题消息监听容器2 --> <bean id="listenerContainer2" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="topicListenConnectionFactory" /> <property name="pubSubDomain" value="true"/><!-- default is false --> <property name="destination" ref="myTopic2" /> <!-- listen topic: myTopic2 --> <property name="subscriptionDurable" value="true"/> <property name="clientId" value="clientId_002"/>!---这里是设置接收客户端的ID,在持久化时,但这个客户端不在线时,消息就存在数据库里,知道被这个ID的客户端消费掉--> <property name="messageListener" ref="topicMessageReceiver" /> </bean> </beans>
测试一下是否能发送和接收消息,我是在main方法里测试的
public static void main(String[] args) throws HttpException, IOException { System.out.println("初始化spring!准备开始接收!"); ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext-jms-topic-receiver.xml"); MessageSender t=(MessageSender) context.getBean("topicMessageSender"); t.sendMessage(); }
Activemq支持两种消息传送模式:PERSISTENT (持久消息)和 NON_PERSISTENT(非持久消息)
从字面意思就可以了解,这是两种正好相反的模式。
1、PERSISTENT 持久消息
是activemq默认的传送方式,此方式下的消息在配合activemq.xml中配置的消息存储方式,会被存储在特定的地方,直到有消费者将消息消费或者消息过期进入DLQ队列,消息生命周期才会结束。
此模式下可以保证消息只会被成功传送一次和成功使用一次,消息具有可靠性。在消息传递到目标消费者,在消费者没有成功应答前,消息不会丢失。所以很自然的,需要一个地方来持久性存储。
如果消息消费者在进行消费过程发生失败,则消息会被再次投递。
2、NON_PERSISTENT 非持久消息
非持久的消息适用于不重要的,可以接受消息丢失的哪一类消息,这种消息只会被投递一次,消息不会在持久性存储中存储,也不会保证消息丢失后的重新投递。
在spring提供的JmsTemplate中,同样提供了针对于当前功能的配置选项:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> <property name="defaultDestination" ref="dest" /> <property name="messageConverter" ref="messageConverter" /> <property name="pubSubDomain" value="false" /> <property name="explicitQosEnabled" value="true" /> <!-- deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false--> <property name="deliveryMode" value="1" /> <!-- 发送模式 DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久--> </bean>
消息的签收模式:
客户端成功接收一条消息的标志是一条消息被签收,成功应答。
消息的签收情形分两种:
1、带事务的session
如果session带有事务,并且事务成功提交,则消息被自动签收。如果事务回滚,则消息会被再次传送。
2、不带事务的session
不带事务的session的签收方式,取决于session的配置。
Activemq支持一下三种模式:
Session.AUTO_ACKNOWLEDGE 消息自动签收
Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息的重复处理控制。
spring提供的JmsTemplate中的配置方式:
<!-- PTP jms模板 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory"></property> <property name="defaultDestination" ref="dest" /> <property name="messageConverter" ref="messageConverter" /> <property name="pubSubDomain" value="false" /> <property name="sessionAcknowledgeMode" value="1" /> <!-- 消息应答方式 Session.AUTO_ACKNOWLEDGE 消息自动签收 Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收 Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送 --> </bean>
最后补充一下,在复制代码的时候,就是xml代码,一定要保证你项目的其他spring的xml的命名空间和你复制的新建的xml(我的新建spring配置activemq的名是:applicationContext-jms-topic.xml)命名空间保持一致,不然会出问题。我就在这挣扎了两天。希望大家注意。
就是这个地方:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
以上资料来自:
http://blog.csdn.net/quzishen/article/details/6128781
http://blog.csdn.net/quzishen/article/details/6131222
相关推荐
在实际项目中,可以根据需求调整配置,例如设置消息持久化、添加消息确认机制、处理事务等,以满足更复杂的业务场景。理解并熟练掌握ActiveMQ与Spring的整合,将有助于构建高效、稳定且可扩展的分布式系统。
ActiveMQ提供了高可用性、可伸缩性和消息持久化,确保即使在服务故障时也能保持数据完整。 **Apache Camel**: Camel是一个企业级集成库,提供了一种声明式的方式来定义和执行复杂的消息路由规则。它拥有丰富的组件...
通过这样的整合,我们可以构建出一个高效、可扩展的企业级应用,利用ActiveMQ进行消息传递,Spring管理业务逻辑,而MyBatis则负责数据持久化。这种架构在保证系统性能的同时,还能提高代码的可维护性和可测试性。在...
在IT行业中,消息队列(Message Queue)是分布式系统中常用的一种组件,它能有效地解耦各个服务,提高系统的响应速度和并发...在实际应用中,可以根据需求调整配置,如设置消息持久化、事务管理、错误处理等高级特性。
通过`ActiveMQ`,我们还可以实现消息的持久化、优先级、事务等高级功能,确保消息的可靠传递。 总结来说,`ActiveMQ`和`Spring`的结合使用为企业级应用带来了高效、灵活和可扩展的消息处理能力。通过`Spring`的容器...
- **持久化**: 可以配置消息持久化,即使服务器重启,消息也不会丢失。 - **事务**: 支持JMS事务,确保消息的可靠投递。 - **优先级**: 消息可以设置优先级,高优先级的消息优先被消费。 - **消息选择器**: 消费...
4. **创建消息生产者**:在Spring应用中,我们可以通过定义一个Bean来创建消息生产者,使用JmsTemplate类发送消息到目的地。JmsTemplate提供了简便的方法来发送各种类型的消息,如文本、对象等。 5. **设置消息消费...
7. **测试与调试**:通过发送消息并观察接收端是否正确处理,可以验证ActiveMQ和Spring的集成是否正常工作。 在实际应用中,ActiveMQ+Spring的组合可以用于各种场景,例如: - **异步处理**:通过消息队列,可以将...
**ActiveMQ5.1+Spring2.5 Demo详解** ActiveMQ是Apache软件基金会下的一个开源项目,它是一款...在实践中,你可以根据自己的需求调整配置,例如改变ActiveMQ服务器的地址、设置消息队列的大小、添加消息的持久化等。
spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...
在这些代码中,可以看到如何使用ActiveMQ的API来设置消息的持久化属性,并且可能包含了连接到ActiveMQ服务器以及发送和接收消息的逻辑。 通过分析这些代码,你可以更深入地理解ActiveMQ如何与MySQL交互,以及如何在...
2. **消息持久化**:ActiveMQ支持消息持久化,即使服务器重启,未被消费的消息也不会丢失。 3. **消息选择器**:允许消费者根据特定条件选择接收消息,提高消息处理效率。 4. **Spring Boot集成**:在Spring Boot...
ActiveMQ具有高可靠性、高性能和丰富的特性,如支持多种协议、支持消息持久化、支持事务和集群等,广泛应用于企业级消息传递场景。 Tomcat是一款轻量级的Java应用服务器,主要用于部署Servlet和JSP应用。它符合Java...
6. **MyBatis**:MyBatis是一个优秀的持久层框架,它支持定制化SQL、存储过程以及高级映射。MyBatis避免了几乎所有的JDBC代码和手动设置参数以及获取结果集。MyBatis可以使用简单的XML或注解进行配置和原始映射,将...
4. 发送与接收消息:使用`JmsTemplate`的`convertAndSend()`方法可以方便地发送消息,而`receive()`或`receiveAndConvert()`则用于接收消息。这些方法极大地简化了消息交互的过程。 四、示例项目结构 在提供的...
10. **性能优化**:涉及如何配置ActiveMQ以提高性能,如消息批量发送、使用持久化策略等。 11. **安全配置**:介绍如何设置用户认证和权限,保护ActiveMQ服务器和消息资源。 在实际的项目中,集成Spring-JMS可以极...
**ActiveMQ 概述** ActiveMQ 是一个开源的消息中间件,它是 Java Message Service(JMS)规范的一个实现,...在实际项目中,根据具体需求,还可以进一步探索 ActiveMQ 的高级特性,如事务消息、持久化、消息优先级等。
"spring+springmvc+mybatis+mongodb+ActiveMQ+CXF"就是一个典型的技术栈,它涵盖了后端开发、数据库管理、消息队列以及服务接口等多个关键领域。下面我们将详细探讨这些技术及其相互作用。 首先,Spring框架是Java...
除了基础整合,还可以探索更高级的主题,如ActiveMQ的持久化机制、消息确认策略、事务处理、消息优先级等。同时,了解Spring的`@JmsListener`注解,可以更简洁地编写消息监听器。 总结,本资源为初学者提供了一条...
5. **消息的持久化**:ActiveMQ允许配置消息的持久性,即使服务器重启,未被消费的消息也不会丢失。在Spring配置中,可以通过设置`JmsTemplate`的`deliveryPersistent`属性为`true`来实现。 6. **事务管理**:...