Spring 整合 ActiveMq
整合步骤如下:
- 添加依赖
- 连接 mq 消息服务器
- 定义生产者/消费者
- 发送/接收消息
添加依赖
<properties>
<spring_version>4.2.4.RELEASE</spring_version>
</properties>
<dependencies>
<!--Spring-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring_version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.2.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring_version}</version>
</dependency>
<!--ActiveMq-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.3</version>
</dependency>
<!--<dependency>-->
<!--<groupId>org.apache.commons</groupId>-->
<!--<artifactId>commons-pool2</artifactId>-->
<!--<version>2.5</version>-->
<!--</dependency>-->
<!--servlet-->
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>RELEASE</version>
<scope>provided</scope>
</dependency>
<!--Test-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!--fast Json-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
连接服务器
定义一个类实现 ConnectionFactory 接口,类属性包括 brokerURL(服务器地址),userName,password,maxConnection(最大连接数, 连接池使用参数);
/**
* author: getthrough
* date: 2018/3/8
* description: 连接工厂的包装类
*/
public class ActiveMqConnectionFactoryDecoration implements ConnectionFactory {
/**apache 提供的连接池*/
// private PooledConnectionFactory pooledConnectionFactory;
private String brokerURL;
private String userName;
private String password;
private String maxConntection;
private ActiveMQConnectionFactory activeMQConnectionFactory;
public ActiveMqConnectionFactoryDecoration() {
}
public void run() throws JMSException {
activeMQConnectionFactory.setBrokerURL(brokerURL);
activeMQConnectionFactory.setUserName(userName);
activeMQConnectionFactory.setPassword(password);
activeMQConnectionFactory.createConnection();
// pooledConnectionFactory = new PooledConnectionFactory();
// pooledConnectionFactory.setMaxConnections(Integer.parseInt(maxConntection));
// pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory);
// pooledConnectionFactory.createConnection(userName, password);
}
public void stop() {
// if (null != pooledConnectionFactory) {
// pooledConnectionFactory.stop();
// }
}
public Connection createConnection() throws JMSException {
// return pooledConnectionFactory.createConnection();
return activeMQConnectionFactory.createConnection();
}
public Connection createConnection(String userName, String password) throws JMSException {
// return pooledConnectionFactory.createConnection(userName, password);
return activeMQConnectionFactory.createConnection(userName, password);
}
public void setActiveMQConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
this.activeMQConnectionFactory = activeMQConnectionFactory;
}
// ... 其他属性的getters&setters
在 spring 配置文件中定义这个 bean:
<bean id="activeMqConnectionFactoryDecoration " class="mq.ActiveMqConnectionFactoryDecoration ">
<property name="activeMQConnectionFactory">
<bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"/>
</property>
<property name="brokerURL" value="${brokerURL}"/>
<property name="userName" value="${userName}"/>
<property name="password" value="${password}"/>
<property name="maxConntection" value="${maxConntection}"/>
</bean>
创建生产者/消费者
创建一个消息发送类, 简单包装下发送消息流程
/**
* author: getthrough
* date: 2018/3/8
* description:
*/
public class ActiveMqSender {
private static JmsTemplate jmsTemplate;
private static Destination destination;
public static void sendMqMessage(final String content) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(content);
}
});
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
创建一个监听器, 实现 MessageListener 接口
/**
* author: getthrough
* date: 2018/3/8
* description:
*/
// 此处ParentMessageListener实现了 MessageListener接口
public class QueueMessageListener extends ParentMessageListener {
private Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
/**
* 消息前处理
*/
public void beforeHandling() {
// doSomething ...
logger.info("before hanlding queue msg ...");
}
/**
* 消息后处理
*/
public void afterHandling() {
// doSomething ...
logger.info("after hanlding queue msg ...");
}
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############");
} catch (JMSException e) {
logger.info("########## failed to get message text! ###########");
e.printStackTrace();
}
}
}
在配置文件中定义生产者和消费者
<!--Spring 提供的消息模板-->
<bean id="springJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="activeMqConnectionFactoryDecoration"/>
</bean>
<!--定义一个队列的地址-->
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg index="0" value="com.getthrough.spring_activemq.queue"/>
</bean>
<!--消息生产者-->
<bean id="producer1" class="mq.ActiveMqSender" name="producer1">
<!--将引用的模板和队列地址注入到助手类的字段中-->
<property name="jmsTemplate" ref="springJmsTemplate"/>
<property name="destination" ref="queue"/>
</bean>
<!--消息监听器-->
<bean id="queueMessageListener" class="mq.listener.QueueMessageListener"/>
<!--消息消费者-->
<bean id="consumer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="activeMqConnectionFactoryDecoration"/>
<property name="destination" ref="queue"/>
<property name="messageListener" ref="queueMessageListener"/>
</bean>
发送消息
使用 servlet 在 tomcat 启动时发送消息(仅用于演示)
/**
* author: getthrough
* date: 2018/3/12
* description:
*/
public class StarterListener implements ServletContextListener {
private Logger logger = LoggerFactory.getLogger(StarterListener.class);
public void contextInitialized(ServletContextEvent sce) {
WebApplicationContext context = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());
ActiveMqConnectionFactoryDecoration connectionFactory =
(ActiveMqConnectionFactoryDecoration) context.getBean("activeMqConnectionFactoryDecoration");
try {
// 连接 mq server
connectionFactory.run();
logger.info("###############");
ActiveMqSender.sendMqMessage("hey, now is" + new Date() + "!");
} catch (JMSException e) {
logger.info("####### failed to connect mq server! ########");
e.printStackTrace();
}
}
public void contextDestroyed(ServletContextEvent sce) {
// doSomething
}
}
接收消息
控制台打印了日志
########### consumer1 has receive the message :hey, now isTue Mar 20 23:48:55 CST 2018! ############
再发消息
在程序运行的情况下,创建一个测试类,用于发送一条消息
/**
* author: getthrough
* date: 2018/3/12
* description:
*/
public class ProducerTest {
@Test
public void testSendMq() throws JMSException {
ActiveMQConnectionFactory mqConnectionFactory = new ActiveMQConnectionFactory();
mqConnectionFactory.setBrokerURL("tcp://127.0.0.1:61616");
// 创建连接
Connection connection = mqConnectionFactory.createConnection();
// 开启连接
connection.start();
// arg1:是否开启事务; arg2:确认模式--自动确认
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 通过session 创建 队列
Queue queue = session.createQueue("com.getthrough.spring_activemq.queue");
// 通过 session 创建消费者
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 默认即持久化
producer.send(session.createTextMessage("It's the message from test producer !"));
// 关闭连接(会级联关闭session),释放资源
connection.close();
}
}
运行测试类, 主程序控制台输出
########### consumer1 has receive the message :It's the message from test producer ! ############
至此, 基础整合完毕.
优化:
可以看到, 目前对于消息的内容以及消费的方式并没有一个具体的格式, 无论是在不同模块之间还是异构的系统之间的消息传递, 定义一个具体的数据格式是十分重要的.
因此, 需要定义一个 mq 消息对象, 例如
/**
* author: getthrough
* date: 2018/3/21
* description:
*/
public class MessageContent {
private String mqName;// 该消息的名称,用来具体做什么处理
private String content;// 消息具体内容
public MessageContent(){}
public MessageContent(String mqName, String content) {
this.mqName = mqName;
this.content = content;
}
public String getMqMsgMethod() {
return mqName;
}
public void setMqMsgMethod(String mqName) {
this.mqName = mqName;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
针对 mq 消息实体中的 mq 方法, 可以做一个 方法----服务类的映射, 这种映射关系, 这里使用枚举实现:
/**
* author: getthrough
* date: 2018/3/21
* description:
*/
public enum MQMsgMethod {
USER_ADD("user.add","userService"),// 方法的名称, 和具体处理该方法的服务类
USER_DELETE("user.delete","userService"),
ORDER_UPDATE("order.update","orderService");// 此处user 和 order 应属于不同的队列
private String methodName;
private String serviceName;
// 该构造主要用于消息的消费
MQMsgMethod(String methodName, String serviceName) {
this.methodName = methodName;
this.serviceName = serviceName;
}
// 该构造主要用于消息的发送
MQMsgMethod(String methodName) {
this.methodName = methodName;
this.serviceName = null;
}
/**
* 根据方法名称获取服务类
* @param methodName
* @return
*/
public static MQMsgMethod getMQMsgMethodByMethodName(String methodName) {
MQMsgMethod[] mqMsgMethods = MQMsgMethod.values();
MQMsgMethod result = null;
if (mqMsgMethods.length > 0) {
for (MQMsgMethod mqMsgMethod : mqMsgMethods) {
if (mqMsgMethod.getMethodName().equals(methodName)) {
result = mqMsgMethod;
break;
}
}
}
return result;
}
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public String getServiceName() {
return serviceName;
}
public void setServiceName(String serviceName) {
this.serviceName = serviceName;
}
}
然后改写监听器中对消费消息的方式:
/**
* 这里假设这个 listener 是监听 user 队列
* @param message
*/
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
logger.info("########### consumer1 has receive the message :" + textMessage.getText() + " ############");
// 将接受到的消息转换为java对象
MessageContent messageContent = (MessageContent)JSONObject.parseObject(textMessage.getText(), MessageContent.class);
// 获取mq消息内容,并转化为java bean
User user = (User) JSONObject.parseObject(messageContent.getContent(), User.class);
// 获取mq方法
String mqMethod = messageContent.getMqMsgMethod();
// 根据mq方法获取处理该消息对应的服务类的名称
MQMsgMethod mqMsgMethod = MQMsgMethod.getMQMsgMethodByMethodName(mqMethod);
String serviceName = mqMsgMethod.getServiceName();
// 获得服务类
UserService userService = (UserService) MyApplicationContext.getBean(serviceName);
// 调用服务类方法
switch (mqMsgMethod) {
case USER_ADD :
userService.addUser(user);
break;
case USER_DELETE:
userService.deleteUser(user);
break;
default:
break;
}
} catch (JMSException e) {
logger.info("########## failed to get message text! ###########");
e.printStackTrace();
}
}
输出结果
########### consumer1 has receive the message :{"content":"{\"address\":\"zhejiang\",\"age\":18,\"email\":\"123@gmail.com\",\"username\":\"lina\"}","mqMsgMethod":"user.add"} ############
...// 省略
adding user......
至此演示完毕.
一个疑问
本人在使用 PooledConnectionFactory 创建连接时总是报一个错误,如下
严重: Exception sending context initialized event to listener instance of class mq.listener.StarterListener
java.lang.NoClassDefFoundError: org/apache/commons/pool2/KeyedPooledObjectFactory
at mq.ActiveMqConnectionFactoryDecoration.run(ActiveMqConnectionFactoryDecoration.java:33)
at mq.listener.StarterListener.contextInitialized(StarterListener.java:33)
at org.apache.catalina.core.StandardContext.listenerStart(StandardContext.java:5068)
at org.apache.catalina.core.StandardContext.startInternal(StandardContext.java:5584)
at org.apache.catalina.util.LifecycleBase.start(LifecycleBase.java:147)
at org.apache.catalina.core.ContainerBase.addChildInternal(ContainerBase.java:899)
at org.apache.catalina.core.ContainerBase.addChild(ContainerBase.java:875)
at org.apache.catalina.core.StandardHost.addChild(StandardHost.java:652)
at org.apache.catalina.startup.HostConfig.manageApp(HostConfig.java:1863)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.tomcat.util.modeler.BaseModelMBean.invoke(BaseModelMBean.java:301)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
at org.apache.catalina.mbeans.MBeanFactory.createStandardContext(MBeanFactory.java:618)
at org.apache.catalina.mbeans.MBeanFactory.createStandardContext(MBeanFactory.java:565)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.tomcat.util.modeler.BaseModelMBean.invoke(BaseModelMBean.java:301)
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.invoke(DefaultMBeanServerInterceptor.java:819)
at com.sun.jmx.mbeanserver.JmxMBeanServer.invoke(JmxMBeanServer.java:801)
at javax.management.remote.rmi.RMIConnectionImpl.doOperation(RMIConnectionImpl.java:1468)
at javax.management.remote.rmi.RMIConnectionImpl.access$300(RMIConnectionImpl.java:76)
at javax.management.remote.rmi.RMIConnectionImpl$PrivilegedOperation.run(RMIConnectionImpl.java:1309)
at javax.management.remote.rmi.RMIConnectionImpl.doPrivilegedOperation(RMIConnectionImpl.java:1401)
at javax.management.remote.rmi.RMIConnectionImpl.invoke(RMIConnectionImpl.java:829)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at sun.rmi.server.UnicastServerRef.dispatch(UnicastServerRef.java:324)
at sun.rmi.transport.Transport$1.run(Transport.java:200)
at sun.rmi.transport.Transport$1.run(Transport.java:197)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.Transport.serviceCall(Transport.java:196)
at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:568)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
at java.security.AccessController.doPrivileged(Native Method)
at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.pool2.KeyedPooledObjectFactory
at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1858)
at org.apache.catalina.loader.WebappClassLoaderBase.loadClass(WebappClassLoaderBase.java:1701)
... 47 more
一开始怀疑时 jar 包存在冲突, 依赖树如下:
[INFO] ------------------------------------------------------------------------
[INFO] Building spring_activemq Maven Webapp 1.0-SNAPSHOT
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ spring_activemq ---
[INFO] com.getthrough.spring_activemq:spring_activemq:war:1.0-SNAPSHOT
[INFO] +- org.springframework:spring-context:jar:4.2.4.RELEASE:compile
[INFO] | +- org.springframework:spring-aop:jar:4.2.4.RELEASE:compile
[INFO] | | \- aopalliance:aopalliance:jar:1.0:compile
[INFO] | +- org.springframework:spring-beans:jar:4.2.4.RELEASE:compile
[INFO] | +- org.springframework:spring-core:jar:4.2.4.RELEASE:compile
[INFO] | | \- commons-logging:commons-logging:jar:1.2:compile
[INFO] | \- org.springframework:spring-expression:jar:4.2.4.RELEASE:compile
[INFO] +- org.springframework:spring-web:jar:4.2.4.RELEASE:compile
[INFO] +- org.springframework:spring-jms:jar:4.2.4.RELEASE:compile
[INFO] | +- org.springframework:spring-messaging:jar:4.2.4.RELEASE:compile
[INFO] | \- org.springframework:spring-tx:jar:4.2.4.RELEASE:compile
[INFO] +- org.springframework:spring-test:jar:4.2.4.RELEASE:compile
[INFO] +- org.apache.activemq:activemq-all:jar:5.15.3:compile
[INFO] +- org.apache.commons:commons-pool2:jar:2.3:compile
[INFO] +- javax.servlet:servlet-api:jar:3.0-alpha-1:provided
[INFO] +- junit:junit:jar:4.12:test
[INFO] | \- org.hamcrest:hamcrest-core:jar:1.3:test
[INFO] \- com.alibaba:fastjson:jar:1.2.47:compile
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
似乎也没有.
更换过commons-pool2 版本为2.5问题还存在.
尝试将 commons-pool2 依赖删除(active-all 中存在该类库), 也无效.
有点蒙
Any advice will be helpful, thanks very much !
示例代码地址
https://github.com/Getthrough/spring_activemq
分享到:
相关推荐
《Spring整合ActiveMQ深度解析》 在现代企业级应用开发中,消息队列(Message Queue)扮演着重要的角色,它能够有效地实现系统间的解耦,提高系统的可扩展性和并发处理能力。Spring作为Java领域的主流框架,与...
百度spring整合activemq 发现几乎都只是在xml文件配置固定的消息队列而且太麻烦。并没有根据需求进行动态生成主题和队列。本文档参考了纯粹的activemq java代码和百度上的demo,很简洁的实现了动态消息队列的生成和...
**Spring 整合 ActiveMQ 简单实例** 在当今的分布式系统中,消息队列(Message Queue)作为异步处理、解耦组件的关键技术,被广泛应用。Spring 框架与 ActiveMQ 的整合,使得开发者能够轻松地在 Spring 应用程序中...
首先,我们要理解Spring整合ActiveMQ的核心概念。在Spring框架中,我们可以通过使用Spring的JMS模块来配置ActiveMQ。这个过程涉及到以下几个关键点: 1. **JMS配置**:在Spring的配置文件中,我们需要定义一个`...
将Spring与ActiveMQ整合,可以轻松地在Spring应用中实现消息队列的功能,提高系统的可扩展性和可靠性。 首先,让我们了解Spring框架如何支持消息传递。Spring提供了JmsTemplate类,这是一个模板类,用于简化发送和...
Spring整合ActiveMQ是Java消息服务(JMS)在Spring框架中的应用,用于实现生产者与消费者的解耦。在这个案例中,我们将深入探讨如何配置和使用这两个组件,以便于理解它们的工作原理。 首先,ActiveMQ是Apache软件...
Spring整合ActiveMQ是Java开发中常见的一种技术组合,主要用于实现应用程序间的异步消息通信。Spring框架提供了对ActiveMQ的高度集成,使得开发者能够轻松地在应用中加入消息队列功能,提高系统的可扩展性和可靠性。...
首先,让我们理解Spring整合ActiveMQ的基本步骤: 1. **引入依赖**:在项目中添加ActiveMQ和Spring相关的依赖库,通常在Maven或Gradle的配置文件中进行。 2. **配置ActiveMQ**:创建一个ActiveMQ服务器,可以通过...
本DEMO将展示如何通过Spring整合ActiveMQ来实现队列(Queue)和主题(Topic)两种不同的通信方式。队列遵循“先进先出”原则,每个消息只有一个消费者;而主题支持多播,允许多个消费者同时接收消息。 首先,你需要...
现在,我们将深入探讨如何将Spring与ActiveMQ整合,并使用Maven进行项目构建。 首先,我们需要理解Spring与ActiveMQ整合的基本概念。Spring通过其`spring-jms`模块提供了对JMS的支持,可以方便地与消息代理如...
本文将围绕"Spring整合ActiveMQ"这一主题,深入讲解如何在Spring框架中实现ActiveMQ的集成。 1. **Spring与ActiveMQ的整合基础** - Spring框架提供了一套完整的消息编程模型,包括Message、MessageListener、...
在这个实例中,我们将探讨如何利用Spring框架整合ActiveMQ(一个流行的开源消息代理)和Quartz(一个广泛使用的作业调度库)来实现JMS(Java消息服务)数据同步。这个方案尤其适用于大型分布式系统,它能够确保即使...
在本文中,我们将深入探讨如何使用Spring框架与Apache ActiveMQ集成,实现点对点通信(Point-to-Point)和发布/订阅(Publish/Subscribe)模式的通信。ActiveMQ是流行的开源消息中间件,它允许应用程序之间异步传输...
通过以上步骤,你应该能够成功地将Spring与ActiveMQ整合,并实现一个完整的消息传递系统。这个实例不仅涵盖了基本的配置和操作,还展示了如何在实际应用中使用这些组件。在实践中,你可以根据需要扩展和调整这些组件...
二、Spring整合ActiveMQ步骤 1. **添加依赖**:在项目中引入ActiveMQ的依赖库,通常通过Maven或Gradle的依赖管理工具进行。 2. **配置ActiveMQ服务器**:设置ActiveMQ服务器,可以是本地运行的实例,也可以是远程...
将ActiveMQ与Spring整合,可以方便地在Spring应用中使用消息队列,简化配置,并提供事务性消息支持。 这个实例代码是将ActiveMQ与Spring进行整合的一个实际项目,可以在Tomcat服务器上运行。由于提供了所有必要的...