`

activeMQ完整的demo,值得你拥有

阅读更多

               

                      最近项目里面要求实时的分析数据,唉,storm学习成本太高,所以就想到了activeMQ. Apache ActiveMQ是最流行的功能强大的开源即时通讯和集成模式的服务器。Apache ActiveMQ的速度快,支持多语言和跨客户协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。废话不多说,直接上demo了。

===============================     action       start  =========================

 一:必备jar包

<dependency>

    <groupId>org.apache.activemq</groupId>

    <artifactId>activemq-all</artifactId>

    <version>5.9.0</version>

</dependency>

<dependency>

<groupId>org.apache.activemq</groupId>

<artifactId>activemq-pool</artifactId>

<version>5.9.0</version>

</dependency>

二:服务端

            服务端包含4部分:

                    a.消息转换器

                    b.消息发布者

                    c.消息发送

                    d.activemq.xml配置

 a.消息转换器

package cn.innosoft.jt809.activemq.converter;

 

import java.math.BigDecimal;

import java.text.ParseException;

import java.text.SimpleDateFormat;

 

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.Session;

import javax.jms.TextMessage;

 

import org.springframework.jms.support.converter.MessageConversionException;

import org.springframework.jms.support.converter.MessageConverter;

 

import cn.innosoft.jt809.biz.dynamic.model.TGnssGpsHis;

/**

 * jms消息转换器

 * @author gaoq

 * @date 2015-3-27 下午2:57:23

 */

public class MsgConverter implements MessageConverter{

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

@Override

public Message toMessage(Object object, Session session) throws JMSException, MessageConversionException {

 if (!(object instanceof GpsVehicleReceive)) {  

           throw new MessageConversionException("obj is not MsgPojo");  

       }  

  GpsVehicleReceive msgPojo = (GpsVehicleReceive) object;  

       TextMessage textMessage = session.createTextMessage();  

       String msg = getGpsMsgString(msgPojo);

       textMessage.setText(msg);  

       return  textMessage;  

}

 

@Override

public Object fromMessage(Message message) throws JMSException, MessageConversionException {

if (!(message instanceof TextMessage)) {  

           throw new MessageConversionException("Message is not TextMessage");  

       }  

       TextMessage textMessage = (TextMessage) message;  

       TGnssGpsHis msg = new TGnssGpsHis();  

       String[] texts=textMessage.getText().split(",");  

         msg.setX(1);

} catch (ParseException e) {

e.printStackTrace();

}

       return msg;  

}

/**

* 获取对象转换后的字符串.

* @param msgPojo TGnssGpsHis

* @return String

*/

private String getGpsMsgString(GpsVehicleReceive msgPojo) {

 

String msg = msgPojo.getX()+","+msgPojo.getY();

return msg;

}

 

}

b.消息发布者

package cn.innosoft.jt809.activemq.service;

import javax.jms.Destination;

import org.springframework.jms.core.JmsTemplate;

import cn.innosoft.jt809.activemq.converter.GpsVehicleReceive;

/**

 * 消息发布者.

 * @author gaoq

 * @date 2015-3-19 上午11:12:00

 */

public class TopicPublisherService {

JmsTemplate jmsTemplate;

Destination destination;

//  public void send(final String msg) {  

//         MessageCreator messageCreator = new MessageCreator() {  

//             public Message createMessage(Session session) throws JMSException {  

//                 TextMessage message = session.createTextMessage();  

//                 message.setText(msg);  

//                 return message;  

//             }  

//         };  

//         jmsTemplate.send(this.destination, messageCreator);

//     }   

public void convertAndSend(GpsVehicleReceive msgPojo){  

        jmsTemplate.convertAndSend(this.destination, msgPojo);  

    }  

public void setJmsTemplate(JmsTemplate jmsTemplate) {

this.jmsTemplate = jmsTemplate;

}

public void setDestination(Destination destination) {

this.destination = destination;

}

 

}

 c.消息发送(在应用类中,获取数据并发送)

private static void sendMsg(TGnssGpsHis gpsHis){

GpsVehicleReceive r = new GpsVehicleReceive();

r.setX(1);

r.setY(2); 

topicPublisherService.convertAndSend(r);//发送

r = null;

 

}

d.activemq.xml配置

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans 

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

           http://www.springframework.org/schema/context

           http://www.springframework.org/schema/context/spring-context-3.0.xsd  

           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

 

<bean id="connectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">  

   <property name="connectionFactory">  

      <bean class="org.apache.activemq.ActiveMQConnectionFactory">  

          <property name="brokerURL" value="tcp://127.0.0.1:9220" />  

      </bean>  

  </property>  

</bean>  

 

    <!-- 发送消息的目的地(主题) -->  

    <bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0" value="myMessageListenerTopic" />  

    </bean>

    

    <bean id="msgConverter" class="cn.innosoft.jt809.activemq.converter.MsgConverter"></bean>

    <!-- 配置TopicJms模板  -->  

    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />  

        <!-- 配置是否为发布订阅者模式,默认为false -->  

        <property name="pubSubDomain" value="true"/>  

<!-- 转换器 -->

        <property name="messageConverter" ref="msgConverter"></property>

    <!--<property name="receiveTimeout" value="10000" />  -->  

    </bean>  

    

    <bean id="topicPublisherService" class="cn.innosoft.jt809.activemq.service.TopicPublisherService">  

       <property name="jmsTemplate" ref="jmsTopicTemplate"/>  

        <property name="destination" ref="topicSubscriberMessageListenerDest" />

    </bean>

</beans>

 三:客户端使用

                客户端使用包含2部分:

                     a.监听类

                     b.activeMq.xml

 a.监听类

package cn.innosoft.exceptionAnalyse.activemq.service;

 

import java.text.SimpleDateFormat;

 

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

 

import cn.innosoft.exceptionAnalyse.biz.AnalyseMsgMng;

import cn.innosoft.exceptionAnalyse.biz.cache.model.GpsVehicleReceive;

 

 

/**

 * 经度异常接收数据监听类.

 * @author gaoq

 * @date 2015-3-27 下午4:32:30

 */

public class TopicColorExceptionService implements MessageListener{

SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public void onMessage(Message message) {  

       if(message instanceof TextMessage){  

        TextMessage textMessage = (TextMessage) message; 

           try {  

            GpsVehicleReceive msg = bindProperties(textMessage);

               // AnalyseMsgMng.getInstance().colorQueue.put(msg);

           } catch (Exception e) {  

               e.printStackTrace();  

           }  

       }  

   }

 

/**

 * 绑定参数到对象上.

 * @param textMessage 消息.

 * @return GpsVehicleReceive

 */

private GpsVehicleReceive bindProperties(TextMessage textMessage) throws Exception{

GpsVehicleReceive msg = new GpsVehicleReceive();  

String[] texts=textMessage.getText().split(",");  

    msg.setX1(texts[0]);

              msg.setX2(texts[1]);

                    msg.setX3(texts[2]);

return msg;

 

 

}

 b.activeMq.xml

    <?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="http://www.springframework.org/schema/beans 

           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

           http://www.springframework.org/schema/context

           http://www.springframework.org/schema/context/spring-context-3.0.xsd  

           http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd

           http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

 

 

<bean id="connectionFactory"  

        class="org.apache.activemq.ActiveMQConnectionFactory">  

        <property name="brokerURL" value="tcp://127.0.0.1:9220" />  

    </bean>  

      

    <!-- 发送消息的目的地(主题) -->  

     <bean id="topicSubscriberMessageListenerDest" class="org.apache.activemq.command.ActiveMQTopic">  

        <constructor-arg index="0" value="myMessageListenerTopic" />  

    </bean>  

    

<!--     <bean id="msgConverter" class="cn.innosoft.freightAnalyse.activemq.converter.MsgConverter"></bean> -->

    <!-- 配置TopicJms模板  -->  

    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="defaultDestination" ref="topicSubscriberMessageListenerDest" />  

        <!-- 配置是否为发布订阅者模式,默认为false -->  

        <property name="pubSubDomain" value="true"/>  

    </bean>  

    

 

    <bean id="topicColorException" class="cn.innosoft.exceptionAnalyse.activemq.service.TopicColorExceptionService"></bean> 

    <bean id="myMsgTopiclistenerContainerColor"  

        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  

        <property name="connectionFactory" ref="connectionFactory" />  

        <property name="destination" ref="topicSubscriberMessageListenerDest" />  

        <property name="messageListener" ref="topicColorException" />  

        <property name="pubSubDomain" value="true" />  

    </bean> 

</beans>   

=============================    action   end           ==========================

综上所述:一个完整的activeMQ的使用方式介绍完了。

接下来干啥呢。。。。。

想起来了。。。。activeMQ的服务还要启动,那么如何启动activeMQ的服务呢?

https://repository.apache.org/content/repositories/releases/org/apache/activemq/

下载了activeMQ之后

apache-activemq-5.9.0-bin.zip

                      解压后会发现:apache-activemq-5.9.0\bin目录下有win32,win64,那这个时候就要看服务器是什么系统了,我的是64位,所以就进去apache-activemq-5.9.0\bin\win64目录双击activemq.bat服务如果启动无异常就好了,然后在浏览器中访问:http://localhost:8161/admin,用户名:admin,密码:admin (这个应该是固定的,如果想改,也是可以得到自己去配置文件中找到位置改掉就ok了)

 

大功告成。。。。。。。。。。。。。。。

 

 

 

 

分享到:
评论

相关推荐

    activemq-demo

    通过学习和研究这个"activemq-demo"项目,你可以深入理解如何在Spring环境中集成和使用ActiveMQ,这对于构建大规模、高并发的分布式系统至关重要。此外,这也能帮助你掌握消息中间件在解决系统解耦、异步处理和容错...

    activemq_demo,activeMQ的简单demo

    通过这个简单的ActiveMQ demo,你可以深入学习消息中间件的核心概念,了解如何在实际项目中运用ActiveMQ进行高效的消息传递,提升系统的并发处理能力和解耦能力。同时,这个过程也有助于掌握JMS规范,为后续更复杂的...

    spring-boot-activemq-demo

    通过这个示例,你可以了解到如何在Spring Boot应用中集成ActiveMQ,使用Queue和Topic进行消息传递,并了解连接池和消息确认机制。这些技术在分布式系统、微服务架构中广泛用于实现异步通信和解耦。

    ActiveMQ Demo (C#)

    在本Demo中,开发者使用C#语言构建了一个与ActiveMQ交互的应用程序,这为理解和实践如何在.NET环境中使用ActiveMQ提供了便利。 1. **ActiveMQ基本概念** - **消息中间件**:ActiveMQ是Apache软件基金会开发的一个...

    ActiveMQ实例Demo

    **ActiveMQ实例Demo详解** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。ActiveMQ以其高性能、高可靠性以及丰富的特性,成为许多企业级应用首选的...

    自己写的ActiveMQ的Demo例子

    这个 ActiveMQ Demo 展示了如何使用 ActiveMQ 进行消息传递,包括生产者和消费者的基本操作,以及如何配置 ActiveMQ 的持久化机制。理解这些概念和实践对于构建基于消息驱动的分布式系统至关重要,因为它们有助于...

    ActiveMQ简单demo

    在实际运行这个"ActiveMQ简单demo"时,你需要确保已经安装了ActiveMQ,并且服务器正在运行。你可以通过访问`http://localhost:8161/admin/`来查看服务器状态和管理队列。 为了进一步学习和理解ActiveMQ,你可能还...

    activeMQ初学使用demo

    本初学使用DEMO将带你走进ActiveMQ的世界,通过队列(Queue)和主题(Topic)两种消息模型来了解其基本用法。 1. **ActiveMQ简介**: - ActiveMQ 是Apache软件基金会的一个项目,它提供了一个跨语言、跨平台的消息...

    ActiveMQ-demo

    ActiveMQ-demo实例代码: /* *生产者启动程序,并发送消息给amq服务器(broker) */ public class Producer { /** * @param args * @throws Exception */ public static void main(String[] args) throws ...

    ActiveMQ Demo(C#)

    ActiveMQ Demo程序,包括发送和接收程序,WinForm开发 关于ActiveMQ的介绍,请看我的这篇文章:http://blog.csdn.net/bodybo/article/details/5647968

    springboot集成activemq实现消息接收demo

    本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们需要在Spring Boot项目中引入ActiveMQ的相关依赖。在`pom.xml`文件中添加以下Maven依赖: ```xml &lt;groupId&gt;org.spring...

    ActiveMQ简单Demo案例

    通过这个简单的Demo,你可以进一步学习如何配置ActiveMQ服务器,管理队列和主题,以及处理更复杂的消息传递场景,如事务性消息、持久化消息、消息优先级等。 总的来说,ActiveMQ作为一个强大的消息中间件,可以帮助...

    activeMQ实战demo

    在这个“ActiveMQ实战demo”中,我们将深入探讨如何使用ActiveMQ进行消息发送和接收,并了解其工作原理。 首先,让我们了解一下JMS。JMS是Java平台上的一个标准接口,定义了生产、消费、管理和消息队列的标准API。...

    ActiveMQ+Spring+Maven Demo

    使用spring jmstemplate写的activemq小demo,浅显易懂。工程下载导入可用(maven 工程) activemq 可直接apache官网下载 传送门http://activemq.apache.org/download.html

    activeMQ demo

    《SpringBoot整合ActiveMQ的完整示例解析及MQ界面配置指南》 在现代分布式系统中,消息队列(Message Queue,简称MQ)扮演着至关重要的角色,它能够有效地解耦系统组件,提高系统的扩展性和容错性。本文将深入探讨...

    activemq生产消费的Demo

    java整合activemq的demo,生产者和消费者两个方法。结合自带的工具http://192.168.1.106:8161。来查看消息传递情况

    activemq-demo.zip

    【标题】"activemq-demo.zip" 是一个包含Apache ...通过深入学习和实践"activemq-demo.zip"中的内容,你可以更好地理解ActiveMQ的工作原理,以及如何在实际项目中有效地使用它来构建可靠的、高并发的消息传递系统。

    spring activeMQ-demo 配置

    Spring框架提供了一套完整的JMS(Java Message Service)支持,可以方便地与各种消息队列进行整合,ActiveMQ作为JMS的实现,能够帮助我们构建高可靠、高性能的消息系统。 **一、环境准备** 1. **安装ActiveMQ**: ...

Global site tag (gtag.js) - Google Analytics