- 浏览: 984889 次
文章分类
- 全部博客 (428)
- Hadoop (2)
- HBase (1)
- ELK (1)
- ActiveMQ (13)
- Kafka (5)
- Redis (14)
- Dubbo (1)
- Memcached (5)
- Netty (56)
- Mina (34)
- NIO (51)
- JUC (53)
- Spring (13)
- Mybatis (17)
- MySQL (21)
- JDBC (12)
- C3P0 (5)
- Tomcat (13)
- SLF4J-log4j (9)
- P6Spy (4)
- Quartz (12)
- Zabbix (7)
- JAVA (9)
- Linux (15)
- HTML (9)
- Lucene (0)
- JS (2)
- WebService (1)
- Maven (4)
- Oracle&MSSQL (14)
- iText (11)
- Development Tools (8)
- UTILS (4)
- LIFE (8)
最新评论
-
Donald_Draper:
Donald_Draper 写道刘落落cici 写道能给我发一 ...
DatagramChannelImpl 解析三(多播) -
Donald_Draper:
刘落落cici 写道能给我发一份这个类的源码吗Datagram ...
DatagramChannelImpl 解析三(多播) -
lyfyouyun:
请问楼主,执行消息发送的时候,报错:Transport sch ...
ActiveMQ连接工厂、连接详解 -
ezlhq:
关于 PollArrayWrapper 状态含义猜测:参考 S ...
WindowsSelectorImpl解析一(FdMap,PollArrayWrapper) -
flyfeifei66:
打算使用xmemcache作为memcache的客户端,由于x ...
Memcached分布式客户端(Xmemcached)
JMS与MQ详解:http://www.fx114.net/qa-48-91234.aspx
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在前文中我们讲过ActiveMQ的PTP和PUB/SUB模式实例,今天我们来看一下ActiveMQ与Spring的集成,Spring为4.0.4,ActiveMQ为5.12.1,在做下面的测试之间,要先添加ActiveMQ的用户:
如下:
修改ActiveMQ安装目录下的配置文件夹下的jetty.xml
保证authenticate属性为true;
再添加用户信息,修改jetty-realm.properties文件,我的如下:
重启ActiveMQ,即可;
需要引入的jar包,
注意不要直接引入activemq-all-5.12.1,这种方式容易产生包冲突;
1.我们先来测试JMSTemplate,操纵ActiveMQ的方式
ActiveMQ属性配置文件activemq.properties
引入属性文件:
ActiveMQ配置文件activemq-context.xml
引用配置文件:
新建订单实体类:
队列消息生产者:
队列消息消费者:
订阅主题消息发布者:
订阅主题消息订阅者:
消息转化器:
测试主类:
启动应用测试,我们用RestClient测试:
访问http://localhost:8080/test/activemq/qsend
RestClient ReponseBody 显示:
queueProducerService send message ok!
控制台输出:
[ INFO] 2016-12-27 16:51:11 QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
访问http://localhost:8080/test/activemq/qrecevie
RestClient ReponseBody 显示:
QueueProducerService发送消息:Tue Dec 27 16:51:11 CST 2016
控制台输出:
[ INFO] 2016-12-27 16:51:31 QueueConsumerServiceImp:32 :======QueueConsumerService收到消息:QueueProducerService发送消息:Tue Dec 27 16:51:11 CST 2016
访问http://localhost:8080/test/activemq/qsendConver
RestClient ReponseBody 显示:
queueProducerService convertAndSend message ok!
控制台输出:
[ INFO] 2016-12-27 16:53:50 MsgConverterHelper:50 :=====转换订单信息为JSON字符串======
访问http://localhost:8080/test/activemq/qrecevieConver
RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 16:55:42 MsgConverterHelper:34 :=====转换JSON字符串为订单信息=======
访问http://localhost:8080/test/activemq/tsend
RestClient ReponseBody 显示:
topicPublisherService send order info ok!
控制台输出:
[ INFO] 2016-12-27 16:56:46 TopicPublisherServiceImp:48 :========向ActiveMq testTopic发送订单信息
访问http://localhost:8080/test/activemq/trecevie
RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 16:58:30 TopicSubscriberServiceImp:27 :==========TopicSubscriberService收到订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656
访问http://localhost:8080/test/activemq/tsendConver
RestClient ReponseBody 显示:
topicPublisherService convertAndSend order info ok!
控制台输出:
[ INFO] 2016-12-27 17:06:37 MsgConverterHelper:50 :=====转换订单信息为JSON字符串======
访问http://localhost:8080/test/activemq/trecevieConver
RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 17:09:41 MsgConverterHelper:34 :=====转换JSON字符串为订单信息=======
上面的测试只能,进行手动接受队列和订阅主题消息,如何自动接受队列和订阅主题消息,这就要用到MessageListener、org.springframework.jms.listener.DefaultMessageListenerContainer,
实现MessageListener并配置DefaultMessageListenerContainer的消息监听器messageListener即可
2.消息监听器
队列消息监听器:
订阅主题消息监听器:
启动应用,测试消息监听器:
访问http://localhost:8080/test/activemq/tsend
RestClient ReponseBody 显示:
topicPublisherService send order info ok!
控制台输出:
[ INFO] 2016-12-27 17:20:25 TopicSubscriberMessageListener:26 :--订阅者 MessageListener收到订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656
[ INFO] 2016-12-27 17:20:25 TopicPublisherServiceImp:48 :========向ActiveMq testTopic发送订单信息
访问http://localhost:8080/test/activemq/qsend
RestClient ReponseBody 显示:
queueProducerService send message ok!
控制台输出:
[ INFO] 2016-12-27 16:51:11 QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
[ INFO] 2016-12-27 17:20:41 QueueConsumerMessageListener:23 :--队列 MessageListener收到信息:QueueProducerService发送消息:Tue Dec 27 17:20:41 CST 2016
[ INFO] 2016-12-27 17:20:41 QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
JMS(ActiveMQ) PTP和PUB/SUB模式实例:http://donald-draper.iteye.com/blog/2347445
ActiveMQ连接工厂、连接详解:http://donald-draper.iteye.com/blog/2348070
ActiveMQ会话初始化:http://donald-draper.iteye.com/blog/2348341
ActiveMQ生产者:http://donald-draper.iteye.com/blog/2348381
ActiveMQ消费者:http://donald-draper.iteye.com/blog/2348389
ActiveMQ启动过程详解:http://donald-draper.iteye.com/blog/2348399
ActiveMQ Broker发送消息给消费者过程详解:http://donald-draper.iteye.com/blog/2348440
Spring与ActiveMQ的集成:http://donald-draper.iteye.com/blog/2347638
Spring与ActiveMQ的集成详解一:http://donald-draper.iteye.com/blog/2348449
Spring与ActiveMQ的集成详解二:http://donald-draper.iteye.com/blog/2348461
在前文中我们讲过ActiveMQ的PTP和PUB/SUB模式实例,今天我们来看一下ActiveMQ与Spring的集成,Spring为4.0.4,ActiveMQ为5.12.1,在做下面的测试之间,要先添加ActiveMQ的用户:
如下:
修改ActiveMQ安装目录下的配置文件夹下的jetty.xml
[root@zabbix conf]# vim jetty.xml <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="user,admin" /> <!-- set authenticate=false to disable login --> <property name="authenticate" value="true" /> </bean> <bean id="adminSecurityConstraint" class="org.eclipse.jetty.util.security.Constraint"> <property name="name" value="BASIC" /> <property name="roles" value="admin" /> <!-- set authenticate=false to disable login --> <property name="authenticate" value="true" /> </bean>
保证authenticate属性为true;
再添加用户信息,修改jetty-realm.properties文件,我的如下:
[root@zabbix conf]# more jetty-realm.properties ## --------------------------------------------------------------------------- ## Licensed to the Apache Software Foundation (ASF) under one or more ## contributor license agreements. See the NOTICE file distributed with ## this work for additional information regarding copyright ownership. ## The ASF licenses this file to You under the Apache License, Version 2.0 ## (the "License"); you may not use this file except in compliance with ## the License. You may obtain a copy of the License at ## ## http://www.apache.org/licenses/LICENSE-2.0 ## ## Unless required by applicable law or agreed to in writing, software ## distributed under the License is distributed on an "AS IS" BASIS, ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ## See the License for the specific language governing permissions and ## limitations under the License. ## --------------------------------------------------------------------------- # Defines users that can access the web (console, demo, etc.) # username: password [,rolename ...] #用户名:密码,角色 admin: admin, admin user: 123456, user [root@zabbix conf]#
重启ActiveMQ,即可;
需要引入的jar包,
注意不要直接引入activemq-all-5.12.1,这种方式容易产生包冲突;
1.我们先来测试JMSTemplate,操纵ActiveMQ的方式
ActiveMQ属性配置文件activemq.properties
# ActiveMQ settings activemq.brokerURL=tcp://192.168.126.128:61616 activemq.userName=user activemq.password=123456 activemq.queueName=testQueue activemq.topicName=testTopic
引入属性文件:
<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>/WEB-INF/classes/jdbc.properties</value> <value>/WEB-INF/classes/redis.properties</value> <value>/WEB-INF/classes/activemq.properties</value> </list> </property> </bean>
ActiveMQ配置文件activemq-context.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd "> <!-- 配置JMS连接工厂 --> <bean id="connectionFactoryMQ" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.brokerURL}" /> <property name="userName" value="${activemq.userName}" /> <property name="password" value="${activemq.password}" /> </bean> <!-- 消息类型转换 --> <bean id="msgConverter" class="com.activemq.help.MsgConverterHelper"/> <!-- 配置Jms模板 --> <!-- 发送消息的目的地(队列) --> <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${activemq.queueName}" /> </bean> <!-- 配置Jms模板 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="defaultDestination" ref="testQueue" /> <!-- 接收消息时的超时时间 --> <!--<property name="receiveTimeout" value="10000" /> --> <!-- 消息类型转换 --> <property name="messageConverter" ref="msgConverter"></property> </bean> <!-- 发送消息的目的地(主题) --> <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${activemq.topicName}" /> </bean> <!-- 配置TopicJms模板 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="defaultDestination" ref="testTopic" /> <!-- 配置是否为发布订阅者模式,默认为false --> <property name="pubSubDomain" value="true"/> <!-- 接收消息时的超时时间 --> <!--<property name="receiveTimeout" value="10000" /> --> <!-- 消息类型转换 --> <property name="messageConverter" ref="msgConverter"></property> </bean> </beans>
引用配置文件:
<import resource="activemq-context.xml" />
新建订单实体类:
package com.enity; import java.io.Serializable; public class Order implements Serializable{ /** * */ private static final long serialVersionUID = -343247274477730446L; private Integer id;//订单id private Double amount; private Integer goodsId;//商品id private Integer goodsAmount;//商品数量 private Integer shopId;//店铺名 public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public Double getAmount() { return amount; } public void setAmount(Double amount) { this.amount = amount; } public Integer getGoodsId() { return goodsId; } public void setGoodsId(Integer goodsId) { this.goodsId = goodsId; } public Integer getGoodsAmount() { return goodsAmount; } public void setGoodsAmount(Integer goodsAmount) { this.goodsAmount = goodsAmount; } public Integer getShopId() { return shopId; } public void setShopId(Integer shopId) { this.shopId = shopId; } public String toString(){ return "订单id:"+this.id+","+"金额(元):"+this.amount+","+"商品id:"+ this.shopId+","+"商品数量:"+this.goodsAmount+","+"店铺id:"+this.shopId; } }
队列消息生产者:
package com.activemq.service.imp; import java.util.Date; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import com.activemq.service.QueueProducerService; import com.enity.Order; /** * 生产Queue * @author donald * @date 2016-12-27 * @time 上午10:19:27 */ @Service public class QueueProducerServiceImp implements QueueProducerService{ private static final Logger log = LoggerFactory.getLogger(QueueProducerServiceImp.class); @Resource(name="jmsQueueTemplate") JmsTemplate jmsTemplate; @Resource(name="testQueue") Destination testQueue; /** * 发送队列消息 */ public void send() { MessageCreator messageCreator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { TextMessage message = session.createTextMessage(); message.setText("QueueProducerService发送消息:" + new Date()); return message; } }; jmsTemplate.send(this.testQueue, messageCreator); log.info("========向ActiveMq testQueue发送信息"); } /** * 发送并转换队列消息 */ @Override public void convertAndSend() { Order order = new Order(); order.setId(1); order.setAmount(150.62); order.setGoodsId(15); order.setGoodsAmount(2); order.setShopId(5656); jmsTemplate.convertAndSend(this.testQueue, order); } }
package com.activemq.service; public interface QueueProducerService { public void send(); public void convertAndSend(); }
队列消息消费者:
package com.activemq.service.imp; import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import com.activemq.service.QueueConsumerService; import com.enity.Order; /** * Queue消费 * @author donald * @date 2016-12-27 * @time 上午10:16:58 */ @Service public class QueueConsumerServiceImp implements QueueConsumerService{ private static final Logger log = LoggerFactory.getLogger(QueueConsumerServiceImp.class); @Resource(name="jmsQueueTemplate") JmsTemplate jmsTemplate; /** * 接受队列消息 */ public String receive() { String result = null; TextMessage message = (TextMessage) jmsTemplate.receive(); try { log.info("======QueueConsumerService收到消息:" + message.getText()); result = message.getText(); } catch (JMSException e) { e.printStackTrace(); } return result; } /** * 接受并转换队列消息 */ @Override public Object receiveAndConvert() { Order order = (Order)jmsTemplate.receiveAndConvert(); return order; } }
package com.activemq.service; public interface QueueConsumerService { public String receive(); public Object receiveAndConvert(); }
订阅主题消息发布者:
package com.activemq.service.imp; import javax.annotation.Resource; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Service; import com.activemq.service.TopicPublisherService; import com.enity.Order; @Service public class TopicPublisherServiceImp implements TopicPublisherService { private static final Logger log = LoggerFactory .getLogger(TopicPublisherServiceImp.class); @Resource(name = "jmsTopicTemplate") JmsTemplate jmsTemplate; @Resource(name = "testTopic") Destination testTopic; /** * 发送订阅主题消息 */ public void send() { MessageCreator messageCreator = new MessageCreator() { public Message createMessage(Session session) throws JMSException { ObjectMessage message = session.createObjectMessage(); Order order = new Order(); order.setId(1); order.setAmount(150.62); order.setGoodsId(15); order.setGoodsAmount(2); order.setShopId(5656); // 我们也可以将Object转换为Json String,作为TextMessage来传送, //在消费再反Json String 为Obejct message.setObject(order); return message; } }; jmsTemplate.send(this.testTopic, messageCreator); log.info("========向ActiveMq testTopic发送订单信息"); } /** * 发送并转换订阅主题消息 */ @Override public void convertAndSend() { Order order = new Order(); order.setId(1); order.setAmount(150.62); order.setGoodsId(15); order.setGoodsAmount(2); order.setShopId(5656); jmsTemplate.convertAndSend(this.testTopic, order); } }
package com.activemq.service; public interface TopicPublisherService { public void send(); public void convertAndSend(); }
订阅主题消息订阅者:
package com.activemq.service.imp; import javax.annotation.Resource; import javax.jms.JMSException; import javax.jms.ObjectMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import com.activemq.service.TopicSubscriberService; import com.enity.Order; @Service public class TopicSubscriberServiceImp implements TopicSubscriberService{ private static final Logger log = LoggerFactory.getLogger(TopicSubscriberServiceImp.class); @Resource(name="jmsTopicTemplate") JmsTemplate jmsTemplate; /** * 接受订阅主题消息 */ public Order receive() { ObjectMessage objMessage = (ObjectMessage) jmsTemplate.receive(); Order order = null; try { order = (Order)objMessage.getObject(); log.info("==========TopicSubscriberService收到订单信息:"+ order.toString()); } catch (JMSException e) { e.printStackTrace(); } return order; } /** * 接受并转换订阅主题消息 */ @Override public Object receiveAndConvert() { Order order = (Order)jmsTemplate.receiveAndConvert(); return order; } }
package com.activemq.service; import com.enity.Order; public interface TopicSubscriberService { public Order receive(); public Object receiveAndConvert(); }
消息转化器:
package com.activemq.help; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.support.converter.MessageConversionException; import org.springframework.jms.support.converter.MessageConverter; import util.JsonUtil; import com.enity.Order; /** * 消息转换器 * @author donald * @date 2016-12-27 * @time 上午11:46:06 */ public class MsgConverterHelper implements MessageConverter{ private static final Logger log = LoggerFactory.getLogger(MsgConverterHelper.class); @Override public Object fromMessage(Message mess) throws JMSException, MessageConversionException { Order order = null; if(!(mess instanceof TextMessage)) { throw new MessageConversionException("Message is not TextMessage"); } else{ TextMessage tMess = (TextMessage)mess; order = JsonUtil.fromJson(tMess.getText(), Order.class); log.info("=====转换JSON字符串为订单信息======="); } return order; } @Override public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException { TextMessage textMessage = null; if (!(obj instanceof Order)) { throw new MessageConversionException("Object is not Order"); } else { textMessage = session.createTextMessage(); Order order = (Order) obj; textMessage.setText(JsonUtil.toJson(order)); log.info("=====转换订单信息为JSON字符串======"); } return textMessage; } }
测试主类:
package com.controller; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseBody; import com.activemq.service.QueueConsumerService; import com.activemq.service.QueueProducerService; import com.activemq.service.TopicPublisherService; import com.activemq.service.TopicSubscriberService; import com.controller.base.BaseController; import com.enity.Order; import util.JsonUtil; /** * 测试PTP&PUB/SUB * @author donald * @date 2016-12-27 * @time 上午11:03:57 */ @Controller @RequestMapping(value="/activemq") public class ActiveMqController extends BaseController{ private static final Logger log = LoggerFactory.getLogger(ActiveMqController.class); @Autowired private QueueProducerService queueProducerService; @Autowired private QueueConsumerService queueConsumerService; @Autowired private TopicPublisherService topicPublisherService; @Autowired private TopicSubscriberService topicSubscriberService; /** * 发送队列消息 * @return */ @RequestMapping("/qsend") @ResponseBody public String queueSend(){ queueProducerService.send(); return "queueProducerService send message ok!"; } /** * 接受队列消息 * @return */ @RequestMapping("/qrecevie") @ResponseBody public String queueRecevie(){ String message = queueConsumerService.receive(); return message; } /** * 发送队列消息(消息转化器) * @return */ @RequestMapping("/qsendConver") @ResponseBody public String queueSendConver(){ queueProducerService.convertAndSend(); return "queueProducerService convertAndSend message ok!"; } /** * 接受队列消息(消息转化器) * @return */ @RequestMapping("/qrecevieConver") @ResponseBody public String queueRecevieConver(){ Order order = (Order) queueConsumerService.receiveAndConvert(); return JsonUtil.toJson(order); } /** * 发送订阅主题消息 * @return */ @RequestMapping("/tsend") @ResponseBody public String topicSend(){ topicPublisherService.send(); return "topicPublisherService send order info ok!"; } /** * 接受订阅主题的消息 * @return */ @RequestMapping("/trecevie") @ResponseBody public String topicRecevie(){ Order order = topicSubscriberService.receive(); return JsonUtil.toJson(order); } /** * 发送订阅主题消息(消息转化器) * @return */ @RequestMapping("/tsendConver") @ResponseBody public String topicSendConver(){ topicPublisherService.convertAndSend(); return "topicPublisherService convertAndSend order info ok!"; } /** * 接受订阅主题的消息(消息转化器) * @return */ @RequestMapping("/trecevieConver") @ResponseBody public String topicRecevieConver(){ Order order = (Order) topicSubscriberService.receiveAndConvert(); return JsonUtil.toJson(order); } }
package com.controller.base; public abstract class BaseController { }
启动应用测试,我们用RestClient测试:
访问http://localhost:8080/test/activemq/qsend
RestClient ReponseBody 显示:
queueProducerService send message ok!
控制台输出:
[ INFO] 2016-12-27 16:51:11 QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
访问http://localhost:8080/test/activemq/qrecevie
RestClient ReponseBody 显示:
QueueProducerService发送消息:Tue Dec 27 16:51:11 CST 2016
控制台输出:
[ INFO] 2016-12-27 16:51:31 QueueConsumerServiceImp:32 :======QueueConsumerService收到消息:QueueProducerService发送消息:Tue Dec 27 16:51:11 CST 2016
访问http://localhost:8080/test/activemq/qsendConver
RestClient ReponseBody 显示:
queueProducerService convertAndSend message ok!
控制台输出:
[ INFO] 2016-12-27 16:53:50 MsgConverterHelper:50 :=====转换订单信息为JSON字符串======
访问http://localhost:8080/test/activemq/qrecevieConver
RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 16:55:42 MsgConverterHelper:34 :=====转换JSON字符串为订单信息=======
访问http://localhost:8080/test/activemq/tsend
RestClient ReponseBody 显示:
topicPublisherService send order info ok!
控制台输出:
[ INFO] 2016-12-27 16:56:46 TopicPublisherServiceImp:48 :========向ActiveMq testTopic发送订单信息
访问http://localhost:8080/test/activemq/trecevie
RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 16:58:30 TopicSubscriberServiceImp:27 :==========TopicSubscriberService收到订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656
访问http://localhost:8080/test/activemq/tsendConver
RestClient ReponseBody 显示:
topicPublisherService convertAndSend order info ok!
控制台输出:
[ INFO] 2016-12-27 17:06:37 MsgConverterHelper:50 :=====转换订单信息为JSON字符串======
访问http://localhost:8080/test/activemq/trecevieConver
RestClient ReponseBody 显示:
{"id":1,"amount":150.62,"goodsId":15,"goodsAmount":2,"shopId":5656}
控制台输出:
[ INFO] 2016-12-27 17:09:41 MsgConverterHelper:34 :=====转换JSON字符串为订单信息=======
上面的测试只能,进行手动接受队列和订阅主题消息,如何自动接受队列和订阅主题消息,这就要用到MessageListener、org.springframework.jms.listener.DefaultMessageListenerContainer,
实现MessageListener并配置DefaultMessageListenerContainer的消息监听器messageListener即可
2.消息监听器
队列消息监听器:
package com.activemq.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Queue监听器 * @author donald * @date 2016-12-27 * @time 上午10:15:57 */ public class QueueConsumerMessageListener implements MessageListener { private static final Logger log = LoggerFactory.getLogger(QueueConsumerMessageListener.class); public void onMessage(Message msg) { if (msg instanceof TextMessage) { TextMessage textMessage = (TextMessage) msg; try { log.info("--队列 MessageListener收到信息:"+ textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
订阅主题消息监听器:
package com.activemq.listener; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.enity.Order; /** * Topic 监听器 * @author donald * @date 2016-12-27 * @time 上午10:16:02 */ public class TopicSubscriberMessageListener implements MessageListener { private static final Logger log = LoggerFactory.getLogger(TopicSubscriberMessageListener.class); public void onMessage(Message msg) { if (msg instanceof ObjectMessage) { ObjectMessage objMessage = (ObjectMessage) msg; try { Order order = (Order)objMessage.getObject(); log.info("--订阅者 MessageListener收到订单信息:"+ order.toString()); } catch (JMSException e) { e.printStackTrace(); } } } } ActiveMQ配置文件activemq-context.xml: <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd "> <!-- 配置JMS连接工厂 --> <bean id="connectionFactoryMQ" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${activemq.brokerURL}" /> <property name="userName" value="${activemq.userName}" /> <property name="password" value="${activemq.password}" /> </bean> <!-- 消息类型转换 --> <bean id="msgConverter" class="com.activemq.help.MsgConverterHelper"/> <!-- 发送消息的目的地(队列) --> <bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${activemq.queueName}" /> </bean> <!-- 配置Jms模板 --> <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="defaultDestination" ref="testQueue" /> <!-- 接收消息时的超时时间 --> <!--<property name="receiveTimeout" value="10000" /> --> <!-- 消息类型转换 --> <property name="messageConverter" ref="msgConverter"></property> </bean> <!-- 消息监听方式 --> <bean id="queueConsumMesListener" class="com.activemq.listener.QueueConsumerMessageListener"/> <bean id="testMsgQueuelistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="destination" ref="testQueue" /> <property name="messageListener" ref="queueConsumMesListener" /> <property name="receiveTimeout" value="10000" /> </bean> <!-- 发送消息的目的地(主题) --> <bean id="testTopic" class="org.apache.activemq.command.ActiveMQTopic"> <!-- 设置消息队列的名字 --> <constructor-arg index="0" value="${activemq.topicName}" /> </bean> <!-- 配置TopicJms模板 --> <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="defaultDestination" ref="testTopic" /> <!-- 配置是否为发布订阅者模式,默认为false --> <property name="pubSubDomain" value="true"/> <!-- 接收消息时的超时时间 --> <!--<property name="receiveTimeout" value="10000" /> --> <!-- 消息类型转换 --> <property name="messageConverter" ref="msgConverter"></property> </bean> <!-- 消息监听方式 --> <bean id="topicSubMesListener" class="com.activemq.listener.TopicSubscriberMessageListener"/> <bean id="testMsgTopiclistenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactoryMQ" /> <property name="destination" ref="testTopic" /> <property name="messageListener" ref="topicSubMesListener" /> <property name="pubSubDomain" value="true" /> <property name="receiveTimeout" value="10000" /> </bean> </beans>
启动应用,测试消息监听器:
访问http://localhost:8080/test/activemq/tsend
RestClient ReponseBody 显示:
topicPublisherService send order info ok!
控制台输出:
[ INFO] 2016-12-27 17:20:25 TopicSubscriberMessageListener:26 :--订阅者 MessageListener收到订单信息:订单id:1,金额(元):150.62,商品id:5656,商品数量:2,店铺id:5656
[ INFO] 2016-12-27 17:20:25 TopicPublisherServiceImp:48 :========向ActiveMq testTopic发送订单信息
访问http://localhost:8080/test/activemq/qsend
RestClient ReponseBody 显示:
queueProducerService send message ok!
控制台输出:
[ INFO] 2016-12-27 16:51:11 QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
[ INFO] 2016-12-27 17:20:41 QueueConsumerMessageListener:23 :--队列 MessageListener收到信息:QueueProducerService发送消息:Tue Dec 27 17:20:41 CST 2016
[ INFO] 2016-12-27 17:20:41 QueueProducerServiceImp:42 :========向ActiveMq testQueue发送信息
发表评论
-
Spring与ActiveMQ的集成详解二
2017-01-03 10:07 2084JMS(ActiveMQ) PTP和PUB/SUB模 ... -
Spring与ActiveMQ的集成详解一
2017-01-02 17:19 4592JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ Broker发送消息给消费者过程详解
2017-01-02 15:30 6283JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ Server启动过程详解
2017-01-02 12:43 6515JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ消费者详解
2017-01-01 14:38 8667JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ生产者详解
2017-01-01 12:29 6867JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
ActiveMQ会话初始化详解
2016-12-31 20:26 4784JMS(ActiveMQ) PTP和PUB/SUB模 ... -
ActiveMQ连接工厂、连接详解
2016-12-29 16:09 12044JMS(ActiveMQ) PTP和PUB/SUB模式实例:h ... -
基于LevelDB的高可用ActiveMQ集群
2016-12-28 18:34 4284ActiveMQ实现负载均衡+高可用部署方案:http://w ... -
ActiveMQ 目录配置文件
2016-12-28 12:47 7795下载apache-activemq-5.12.1.tar.gz ... -
ActiveMQ PTP模式实例二
2016-12-27 14:45 698这篇主要是测试PTP模式下的回复消息,具体测试代码如下: 队列 ... -
JMS(ActiveMQ) PTP和PUB/SUB模式实例
2016-12-27 09:02 3125深入浅出JMS(一)——JMS简介 :http://blog. ...
相关推荐
而Spring框架,作为一个Java平台的全功能模块化解决方案,提供了与ActiveMQ集成的能力,让开发者能够轻松地在Spring应用中使用消息队列。本篇将深入探讨Spring与ActiveMQ的集成及其配置过程。 首先,理解Spring与...
本案例将详细讲解如何将Spring与ActiveMQ整合,以提升系统的可扩展性和解耦性。 1. **Spring框架**:Spring是一个全方位的开发框架,提供了依赖注入(Dependency Injection, DI)和面向切面编程(Aspect-Oriented ...
spring集成activeMQ框架 配置方式(内含三种常见的消息接受监听方式的配置)JMS 配置测试等等
以下将详细介绍如何进行Spring与ActiveMQ的集成,并提供一些关键知识点。 1. **安装配置ActiveMQ** - 首先,需要下载并安装ActiveMQ服务器。可以从官方网站(https://activemq.apache.org/)获取最新版本。 - 启动...
本文将深入探讨如何将Spring与ActiveMQ进行整合,以提升系统的可扩展性和解耦性。 一、Spring与ActiveMQ整合基础 1. **消息中间件概念**:消息中间件是一种软件,它在不同的应用之间传递消息,实现了应用之间的...
7. **事务支持**:Spring与ActiveMQ的集成还支持事务,确保消息要么全部发送成功,要么全部失败。这对于确保数据一致性至关重要。 8. **消息确认**:ActiveMQ允许客户端确认消息的接收,这样可以确保消息被正确处理...
首先,我们需要了解Spring与ActiveMQ集成的基本概念。Spring框架提供了一套完整的JMS(Java Message Service)支持,可以方便地与各种消息队列进行整合,ActiveMQ作为JMS的实现,能够帮助我们构建高可靠、高性能的...
接下来,我们将讨论Spring与ActiveMQ集成的关键步骤: 1. **引入依赖**:在Spring项目中,首先需要添加ActiveMQ的依赖库。这通常通过Maven或Gradle的配置文件来完成,例如在Maven的pom.xml中添加如下依赖: ```xml...
标题“spring+activemq”暗示了我们将探讨如何将Spring框架与ActiveMQ集成,以便利用JMS进行高效的消息传递。下面我们将深入讨论这个主题。 首先,**Spring对JMS的支持**:Spring框架提供了一套强大的JMS抽象层,...
Spring作为Java领域的主流框架,与ActiveMQ的集成使得这种消息中间件的使用更加便捷。本文将详细探讨Spring与ActiveMQ的整合过程,以及相关的知识点。 1. **Spring框架**:Spring是Java开发中的一个全功能框架,...
将Spring Boot与ActiveMQ集成可以提供强大的消息处理能力,使应用能够解耦组件,提高可扩展性和容错性。 本文将详细讲解如何使用Spring Boot与内置的ActiveMQ进行集成,以及如何通过代码实现这一过程。 首先,我们...
Spring 集成 ActiveMQ 配置是指将 Spring 框架与 ActiveMQ 消息队列集成,以实现基于 JMS(Java Message Service)的消息传递。ActiveMQ 是 Apache 软件基金会的一个开源的消息队列系统,提供了高性能、可靠的消息...
**二、Spring与ActiveMQ集成** 1. **引入依赖** 在Spring项目中,我们需要添加ActiveMQ的相关依赖,如`spring-jms`和`activemq-client`。在`pom.xml`或`build.gradle`文件中添加对应的Maven或Gradle依赖。 2. **...
总之,"spring配置activemq详解"是一个涵盖Spring与ActiveMQ整合的深度话题,涉及了从项目配置到消息生产和消费的全过程。通过理解和实践这些知识点,开发者能够构建出稳定、高效的分布式消息处理系统。
9. **Spring注解**:在提供的实例中,可能包含了使用注解的方式配置Spring与ActiveMQ的集成,如`@EnableJms`启动JMS支持,`@JmsListener`定义消息监听器等。 10. **Tomcat服务器**:Tomcat是一个流行的Java Web...
标题 "Spring-ActiveMQ-jar" 指的是一个与Spring框架和ActiveMQ集成相关的Java归档(JAR)文件。这个JAR文件通常包含了Spring框架支持ActiveMQ所需的所有类和资源,使得开发者能够利用Spring的IoC(Inversion of ...
Spring与ActiveMQ的整合能够帮助开发者实现高效、可靠的分布式消息传递。 首先,我们要理解Spring整合ActiveMQ的核心概念。在Spring框架中,我们可以通过使用Spring的JMS模块来配置ActiveMQ。这个过程涉及到以下几...