最近项目里面要求实时的分析数据,唉,storm学习成本太高,所以就想到了activeMQ. Apache ActiveMQ是最流行的功能强大的开源即时通讯和集成模式的服务器。Apache ActiveMQ的速度快,支持多语言和跨客户协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。废话不多说,直接上demo了。
=============================== action start =========================
一:必备jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency>
二:服务端
服务端包含4部分:
a.消息转换器
b.消息发布者
c.消息发送
d.activemq.xml配置
a.消息转换器
package cn.innosoft.jt809.activemq.converter;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.springframework.jms.support.converter.MessageConversionException;
import org.springframework.jms.support.converter.MessageConverter;
import cn.innosoft.jt809.biz.dynamic.model.TGnssGpsHis;
/**
* jms消息转换器
* @author gaoq
* @date 2015-3-27 下午2:57:23
*/
public class MsgConverter implements MessageConverter{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Override
public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {
if (!(object instanceof GpsVehicleReceive)) {
throw new MessageConversionException("obj is not MsgPojo");
}
GpsVehicleReceive msgPojo = (GpsVehicleReceive) object;
TextMessage textMessage = session.createTextMessage();
String msg = getGpsMsgString(msgPojo);
textMessage.setText(msg);
return textMessage;
}
@Override
public Object fromMessage(Message message) throws JMSException, MessageConversionException {
if (!(message instanceof TextMessage)) {
throw new MessageConversionException("Message is not TextMessage");
}
TextMessage textMessage = (TextMessage) message;
TGnssGpsHis msg = new TGnssGpsHis();
String[] texts=textMessage.getText().split(",");
msg.setX(1);
} catch (ParseException e) {
e.printStackTrace();
}
return msg;
}
/**
* 获取对象转换后的字符串.
* @param msgPojo TGnssGpsHis
* @return String
*/
private String getGpsMsgString(GpsVehicleReceive msgPojo) {
String msg = msgPojo.getX()+","+msgPojo.getY();
return msg;
}
}
b.消息发布者
package cn.innosoft.jt809.activemq.service;
import javax.jms.Destination;
import org.springframework.jms.core.JmsTemplate;
import cn.innosoft.jt809.activemq.converter.GpsVehicleReceive;
/**
* 消息发布者.
* @author gaoq
* @date 2015-3-19 上午11:12:00
*/
public class TopicPublisherService {
JmsTemplate jmsTemplate;
Destination destination;
// public void send(final String msg) {
// MessageCreator messageCreator = new MessageCreator() {
// public Message createMessage(Session session) throws JMSException {
// TextMessage message = session.createTextMessage();
// message.setText(msg);
// return message;
// }
// };
// jmsTemplate.send(this.destination, messageCreator);
// }
public void convertAndSend(GpsVehicleReceive msgPojo){
jmsTemplate.convertAndSend(this.destination, msgPojo);
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void setDestination(Destination destination) {
this.destination = destination;
}
}
c.消息发送(在应用类中,获取数据并发送)
private static void sendMsg(TGnssGpsHis gpsHis){
GpsVehicleReceive r = new GpsVehicleReceive();
r.setX(1);
r.setY(2);
topicPublisherService.convertAndSend(r);//发送
r = null;
}
d.activemq.xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:9220" />
</bean>
</property>
</bean>
<!-- 发送消息的目的地(主题) -->
<bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="myMessageListenerTopic" />
</bean>
<bean id="msgConverter" class="cn.innosoft.jt809.activemq.converter.MsgConverter"></bean>
<!-- 配置TopicJms模板 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />
<!-- 配置是否为发布订阅者模式,默认为false -->
<property name="pubSubDomain" value="true"/>
<!-- 转换器 -->
<property name="messageConverter" ref="msgConverter"></property>
<!--<property name="receiveTimeout" value="10000" /> -->
</bean>
<bean id="topicPublisherService" class="cn.innosoft.jt809.activemq.service.TopicPublisherService">
<property name="jmsTemplate" ref="jmsTopicTemplate"/>
<property name="destination" ref="topicSubscriberMessageListenerDest" />
</bean>
</beans>
三:客户端使用
客户端使用包含2部分:
a.监听类
b.activeMq.xml
a.监听类
package cn.innosoft.exceptionAnalyse.activemq.service;
import java.text.SimpleDateFormat;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import cn.innosoft.exceptionAnalyse.biz.AnalyseMsgMng;
import cn.innosoft.exceptionAnalyse.biz.cache.model.GpsVehicleReceive;
/**
* 经度异常接收数据监听类.
* @author gaoq
* @date 2015-3-27 下午4:32:30
*/
public class TopicColorExceptionService implements MessageListener{
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
GpsVehicleReceive msg = bindProperties(textMessage);
// AnalyseMsgMng.getInstance().colorQueue.put(msg);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 绑定参数到对象上.
* @param textMessage 消息.
* @return GpsVehicleReceive
*/
private GpsVehicleReceive bindProperties(TextMessage textMessage) throws Exception{
GpsVehicleReceive msg = new GpsVehicleReceive();
String[] texts=textMessage.getText().split(",");
msg.setX1(texts[0]);
msg.setX2(texts[1]);
msg.setX3(texts[2]);
return msg;
}
}
b.activeMq.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">
<bean id="connectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://127.0.0.1:9220" />
</bean>
<!-- 发送消息的目的地(主题) -->
<bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg index="0" value="myMessageListenerTopic" />
</bean>
<!-- <bean id="msgConverter" class="cn.innosoft.freightAnalyse.activemq.converter.MsgConverter"></bean> -->
<!-- 配置TopicJms模板 -->
<bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />
<!-- 配置是否为发布订阅者模式,默认为false -->
<property name="pubSubDomain" value="true"/>
</bean>
<bean id="topicColorException" class="cn.innosoft.exceptionAnalyse.activemq.service.TopicColorExceptionService"></bean>
<bean id="myMsgTopiclistenerContainerColor"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="topicSubscriberMessageListenerDest" />
<property name="messageListener" ref="topicColorException" />
<property name="pubSubDomain" value="true" />
</bean>
</beans>
============================= action end ==========================
综上所述:一个完整的activeMQ的使用方式介绍完了。
接下来干啥呢。。。。。
想起来了。。。。activeMQ的服务还要启动,那么如何启动activeMQ的服务呢?
https://repository.apache.org/content/repositories/releases/org/apache/activemq/
下载了activeMQ之后
apache-activemq-5.9.0-bin.zip
解压后会发现:apache-activemq-5.9.0\bin目录下有win32,win64,那这个时候就要看服务器是什么系统了,我的是64位,所以就进去apache-activemq-5.9.0\bin\win64目录双击activemq.bat服务如果启动无异常就好了,然后在浏览器中访问:http://localhost:8161/admin,用户名:admin,密码:admin (这个应该是固定的,如果想改,也是可以得到自己去配置文件中找到位置改掉就ok了)
大功告成。。。。。。。。。。。。。。。
相关推荐
通过学习和研究这个"activemq-demo"项目,你可以深入理解如何在Spring环境中集成和使用ActiveMQ,这对于构建大规模、高并发的分布式系统至关重要。此外,这也能帮助你掌握消息中间件在解决系统解耦、异步处理和容错...
通过这个简单的ActiveMQ demo,你可以深入学习消息中间件的核心概念,了解如何在实际项目中运用ActiveMQ进行高效的消息传递,提升系统的并发处理能力和解耦能力。同时,这个过程也有助于掌握JMS规范,为后续更复杂的...
通过这个示例,你可以了解到如何在Spring Boot应用中集成ActiveMQ,使用Queue和Topic进行消息传递,并了解连接池和消息确认机制。这些技术在分布式系统、微服务架构中广泛用于实现异步通信和解耦。
在本Demo中,开发者使用C#语言构建了一个与ActiveMQ交互的应用程序,这为理解和实践如何在.NET环境中使用ActiveMQ提供了便利。 1. **ActiveMQ基本概念** - **消息中间件**:ActiveMQ是Apache软件基金会开发的一个...
**ActiveMQ实例Demo详解** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。ActiveMQ以其高性能、高可靠性以及丰富的特性,成为许多企业级应用首选的...
这个 ActiveMQ Demo 展示了如何使用 ActiveMQ 进行消息传递,包括生产者和消费者的基本操作,以及如何配置 ActiveMQ 的持久化机制。理解这些概念和实践对于构建基于消息驱动的分布式系统至关重要,因为它们有助于...
在实际运行这个"ActiveMQ简单demo"时,你需要确保已经安装了ActiveMQ,并且服务器正在运行。你可以通过访问`http://localhost:8161/admin/`来查看服务器状态和管理队列。 为了进一步学习和理解ActiveMQ,你可能还...
本初学使用DEMO将带你走进ActiveMQ的世界,通过队列(Queue)和主题(Topic)两种消息模型来了解其基本用法。 1. **ActiveMQ简介**: - ActiveMQ 是Apache软件基金会的一个项目,它提供了一个跨语言、跨平台的消息...
ActiveMQ-demo实例代码: /* *生产者启动程序,并发送消息给amq服务器(broker) */ public class Producer { /** * @param args * @throws Exception */ public static void main(String[] args) throws ...
ActiveMQ Demo程序,包括发送和接收程序,WinForm开发 关于ActiveMQ的介绍,请看我的这篇文章:http://blog.csdn.net/bodybo/article/details/5647968
本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们需要在Spring Boot项目中引入ActiveMQ的相关依赖。在`pom.xml`文件中添加以下Maven依赖: ```xml <groupId>org.spring...
通过这个简单的Demo,你可以进一步学习如何配置ActiveMQ服务器,管理队列和主题,以及处理更复杂的消息传递场景,如事务性消息、持久化消息、消息优先级等。 总的来说,ActiveMQ作为一个强大的消息中间件,可以帮助...
在这个“ActiveMQ实战demo”中,我们将深入探讨如何使用ActiveMQ进行消息发送和接收,并了解其工作原理。 首先,让我们了解一下JMS。JMS是Java平台上的一个标准接口,定义了生产、消费、管理和消息队列的标准API。...
使用spring jmstemplate写的activemq小demo,浅显易懂。工程下载导入可用(maven 工程) activemq 可直接apache官网下载 传送门http://activemq.apache.org/download.html
《SpringBoot整合ActiveMQ的完整示例解析及MQ界面配置指南》 在现代分布式系统中,消息队列(Message Queue,简称MQ)扮演着至关重要的角色,它能够有效地解耦系统组件,提高系统的扩展性和容错性。本文将深入探讨...
java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况
【标题】"activemq-demo.zip" 是一个包含Apache ...通过深入学习和实践"activemq-demo.zip"中的内容,你可以更好地理解ActiveMQ的工作原理,以及如何在实际项目中有效地使用它来构建可靠的、高并发的消息传递系统。
Spring框架提供了一套完整的JMS(Java Message Service)支持,可以方便地与各种消息队列进行整合,ActiveMQ作为JMS的实现,能够帮助我们构建高可靠、高性能的消息系统。 **一、环境准备** 1. **安装ActiveMQ**: ...