`
weitao1026
  • 浏览: 1056344 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

使用activemq

 
阅读更多

说说在项目里是怎么使用activemq(简称为amq)进行通信的。

有2个系统,面向不同的用户,简称为系统A和系统B。本文为了简单,只将系统A作为 队列A.CreateMessage的生产端,系统B作为 队列A.CreateMessage的消费端,传输的message可为一个设计好的类的对象,本文为了简单,传输的是一个String对象。

另外,系统A也可以作为另一队列QC的消费端,系统B作为队列QC的生产端。

 1.下载一个apache-activemq-5.10.2,根据系统类型(操作系统位数),选择启动bin目录下的win32或win64目录下的activemq.bat文件。启动后,打开浏览器,输入localhost:8161/admin/queues.jsp,

如果页面是下面这样的

     

输入用户名:admin,密码:admin就OK了。

          

2.amq也启动了,那么接下来是在系统A加上amq相关内容。

项目目录结构如下:

    

 

系统A的applicationContext-amq.xml文件:

复制代码
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd">
    <!--
        使用spring的listenerContainer,消息用持久化保存,服务器重启不会丢失
    -->
     <!-- 连接外部的activeMQ-->
    <amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>

    <!--  Spring JmsTemplate config -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <!--  lets wrap in a pool to avoid creating a connection per send -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
            </bean>
        </property>
        <!-- custom MessageConverter -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>
    
    <!-- converter  -->
    <bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" />
    
    <!--  ActiveMQ destinations  -->
    <!--  使用Queue方式-->
    <amq:queue name="QUEUE" physicalName="TESTQ" />

    <bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer">
        <property name="template" ref="jmsTemplate" />
        <property name="destination" ref="QUEUE" />
    </bean>
    
   
复制代码
  <!-- consumer for queue -->
    <bean id="queueConsumer" class="tools.amq.QueueConsumer" />

    <!-- Message Listener for queue -->
    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="queueConsumer" />
        <!--  may be other method -->
        <property name="defaultListenerMethod" value="receive" />
        <!-- custom MessageConverter define -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!--  listener container,MDP无需实现接口 -->
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="destination" ref="QUEUE" />
        <property name="messageListener" ref="queueListener" />
    </bean>
复制代码
    <!-- 测试 向MQ发消息 -->
    <amq:queue name="CreateMessage" physicalName="A.CreateMessage" />
    <!-- 生产数据 -->
    <bean id="createMessageProducer" class="com.pack.app.amq.producer.CreateMessageProducer">
        <property name="template" ref="jmsTemplate" />
        <property name="destination" ref="CreateMessage" />
    </bean>
</beans>
复制代码

DefaultMessageConverter.java

复制代码
public class DefaultMessageConverter implements MessageConverter {
    /**
     * Logger for this class
     */
    private static final Log log = LogFactory.getLog(DefaultMessageConverter.class);

    public Message toMessage(Object obj, Session session) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug("toMessage(Object, Session) - start");
        }

        // check Type
        ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();
        HashMap<String, byte[]> map = new HashMap<String, byte[]>();
        try {
            // POJO must implements Seralizable
            ByteArrayOutputStream bos = new ByteArrayOutputStream();
            ObjectOutputStream oos = new ObjectOutputStream(bos);
            oos.writeObject(obj);
            map.put("POJO", bos.toByteArray());
            objMsg.setObjectProperty("Map", map);

        } catch (IOException e) {
            e.printStackTrace();
            log.error("toMessage(Object, Session)", e);
        }
        return objMsg;
    }

    public Object fromMessage(Message msg) throws JMSException {
        if (log.isDebugEnabled()) {
            log.debug("fromMessage(Message) - start");
        }

        if (msg instanceof ObjectMessage) {
            HashMap<String, byte[]> map = (HashMap<String, byte[]>) ((ObjectMessage) msg).getObjectProperty("Map");
            try {
                // POJO must implements Seralizable
                ByteArrayInputStream bis = new ByteArrayInputStream(map.get("POJO"));
                ObjectInputStream ois = new ObjectInputStream(bis);
                Object returnObject = ois.readObject();
                return returnObject;
            } catch (IOException e) {
                e.printStackTrace();
                log.error("fromMessage(Message)", e);

            } catch (ClassNotFoundException e) {
                e.printStackTrace();
                log.error("fromMessage(Message)", e);
            }

            return null;
        } else {
            throw new JMSException("Msg:[" + msg + "] is not Map");
        }
    }
}
复制代码

QueueMessageProducer.java
复制代码
import org.springframework.jms.core.JmsTemplate;

import javax.jms.Queue;

/**
 * Date: 2015-7-1
 * Time: 17:14:23
 */
public class QueueMessageProducer {

    private JmsTemplate template;

    private Queue destination;

    public void setTemplate(JmsTemplate template) {
        this.template = template;
    }

    public void setDestination(Queue destination) {
        this.destination = destination;
    }

    public void send(FooMessage message) {
        template.convertAndSend(this.destination, message);
    }

}
复制代码

CreateMessageProducer.java(消息生产者)

复制代码
import javax.jms.Queue;
import org.springframework.jms.core.JmsTemplate;

public class CreateMessageProducer {

     private JmsTemplate template;

     private Queue destination;
     
     public void setTemplate(JmsTemplate template) {
        this.template = template;
    }

    public void setDestination(Queue destination) {
        this.destination = destination;
    }

    public void send(String str) {
        template.convertAndSend(this.destination, str);
        System.out.println("system A send message to system B~~~~~~~~~~");
    }
}
复制代码

 

 3.在系统B加上amq相关内容。

applicationContext-amq.xml文件

复制代码
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
 http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.10.2.xsd">
    <!--
        使用spring的listenerContainer,消息用持久化保存,服务器重启不会丢失
    -->
     <!-- 连接外部的activeMQ-->
    <amq:connectionFactory id="jmsConnectionFactory" userName="admin" password="admin" brokerURL="tcp://localhost:61616"></amq:connectionFactory>

    <!--  ActiveMQ destinations  -->
    <!--  使用Queue方式-->
    <amq:queue name="QUEUE" physicalName="TESTQ" />

    <!--  Spring JmsTemplate config -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <!--  lets wrap in a pool to avoid creating a connection per send -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory" ref="jmsConnectionFactory" />
            </bean>
        </property>
        <!-- custom MessageConverter -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!-- converter  -->
    <bean id="defaultMessageConverter" class="com.pack.app.amq.DefaultMessageConverter" />

    <bean id="queueMessageProducer" class="tools.amq.QueueMessageProducer">
        <property name="template" ref="jmsTemplate" />
        <property name="destination" ref="QUEUE" />
    </bean>

    <!-- consumer for queue -->
    <bean id="queueConsumer" class="tools.amq.QueueConsumer" />

    <!-- Message Listener for queue -->
    <bean id="queueListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="queueConsumer" />
        <!--  may be other method -->
        <property name="defaultListenerMethod" value="receive" />
        <!-- custom MessageConverter define -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!--  listener container,MDP无需实现接口 -->
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="destination" ref="QUEUE" />
        <property name="messageListener" ref="queueListener" />
    </bean>
    
    
    <!-- 测试  接收消息 -->
    <amq:queue name="CreateMessage" physicalName="A.CreateMessage" />
    <!-- 接收数据 -->
    <bean id="createMessageConsumer" class="com.pack.app.amq.consumer.CreateMessageConsumer" />  
    <!-- 监听 -->
    <bean id="createMessageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
        <constructor-arg ref="createMessageConsumer" />
        <!--  may be other method -->
        <property name="defaultListenerMethod" value="process" />
        <!-- custom MessageConverter define -->
        <property name="messageConverter" ref="defaultMessageConverter" />
    </bean>

    <!--  listener container,MDP无需实现接口 -->
    <bean id="createMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsConnectionFactory" />
        <property name="destination" ref="CreateMessage" />
        <property name="messageListener" ref="createMessageListener" />
        
        <!-- 消费者个数  -->
        <!-- <property name="concurrentConsumers" value="4"></property> -->
    </bean>
</beans>
复制代码
DefaultMessageConverter.java、QueueMessageProducer.java、QueueConsumer.java与系统A一样。

CreateMessageConsumer.java
复制代码
public class CreateMessageConsumer {

    @Autowired
    public AgentService agentService;
    
    public void process(String str) {
        System.out.println("system B receive message from  system A ");
        
        agentService.agentPath(str);
    }
}
复制代码

4.启动系统A和系统B的应用,只要系统A往队列A.CreateMessage产生消息,系统B会自动接收到消息。

分享到:
评论

相关推荐

    JMS 使用 ActiveMQ 传送文件

    **标题:“JMS 使用 ActiveMQ 传送文件”** 在IT领域,Java消息服务(Java Message Service,简称JMS)是一种标准API,它允许应用程序创建、发送、接收和读取消息。这种技术常用于异步通信,尤其是在分布式系统中,...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    spring使用activemq

    本篇文章将详细探讨如何在Spring环境中整合并使用ActiveMQ,帮助开发者更好地理解和运用这两个强大的工具。 首先,让我们了解Spring与ActiveMQ整合的基础概念。Spring框架提供了对多种消息中间件的集成支持,包括...

    activeMq in action 使用activeMq开发JMS的简单讲述

    本篇文章将深入探讨如何使用ActiveMQ进行JMS开发,以及ActiveMQ的核心特性。 一、Java消息服务(JMS) JMS是一种为分布式环境设计的消息传递规范,它定义了生产、存储和消费消息的标准接口。通过JMS,应用程序可以...

    使用ActiveMQ 5.X

    - **版本5端口配置**:涉及到如何配置ActiveMQ使用的网络端口,这对于解决网络问题至关重要。 - **LDAP代理查找机制**:介绍如何通过LDAP服务来查找和管理ActiveMQ代理,这对于大型企业级应用非常有用。 - **...

    使用ActiveMQ让C++与C#通信

    ActiveMQ使用发布/订阅或点对点模型来处理消息传递,确保消息的可靠传输。 要实现C++与C#之间的通信,我们需要完成以下步骤: 1. **安装和配置ActiveMQ**: 下载并安装ActiveMQ,启动服务,通常会有一个Web管理界面...

    Spring 使用ActiveMQ 收发消息实例

    在本文中,我们将深入探讨如何在Java Spring框架中使用ActiveMQ进行消息传递。Spring与ActiveMQ的集成使得在分布式系统中实现可靠的消息通信变得简单高效。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它...

    Linux安装和使用ActiveMQ教程

    在Linux中安装和使用ActiveMQ教程

    使用ActiveMQ入门消息中间件.zip

    使用ActiveMQ入门消息中间件

    activeMQ 与 quartz 事例

    描述中的"NULL"意味着没有提供额外的信息,但我们可以从标题推测,这篇文章或压缩包可能包含了一些关于如何在实际项目中使用ActiveMQ和Quartz的示例代码或教程。 **标签解析:** 1. **源码** - 暗示了压缩包可能...

    JMS使用ActiveMQ的支持包

    在Spring中安装ActiveMQ www.activemq.org apache-activemq-4.1.0-incubator.zip ,incubator.jar复制到lib中 启动ActiveMQ的脚本:运行刚下载的文件/bin/activemq.bat. 这样就可以使用ActiveMQ进行中介服务了。

    go语言实现使用activemq 收发消息

    在本文中,我们将深入探讨如何使用Go语言实现与ActiveMQ的通信,主要关注消息的收发功能。ActiveMQ是Apache软件基金会开发的一款开源消息中间件,支持多种协议,包括我们这里提到的STOMP(Simple Text Oriented ...

    Mule_ActiveMQ

    4. **Mule ActiveMQ 集成**:在 Mule 中,ActiveMQ 作为内置组件使用,无需额外安装。通过 Mule,可以方便地配置 ActiveMQ 队列和主题,实现消息的发布/订阅模型或者点对点通信模型。 5. **消息模型**: - **点对...

    ActiveMQ使用手册(中文版)

    ### ActiveMQ 使用手册知识点概述 #### 一、ActiveMQ 原理与基本构件 **1.1 连接工厂(Connection Factory):** - **定义:** 连接工厂是客户端用来创建连接的对象。在ActiveMQ中,`ActiveMQConnectionFactory` 类...

    使用ActiveMQ所有jar

    JMS--ActiveMQ开发需要的所有jar包,本版本为5.10.0版本

    spring使用activeMQ

    ActiveMQ 是 Apache 开源项目,提供了一个高性能、可伸缩、易于使用的消息代理服务,支持多种消息协议如 JMS(Java Message Service)。 在 Spring 中使用 ActiveMQ 主要涉及以下几个核心概念: 1. **JMS**:Java ...

    ActiveMQ的安装与使用

    配置主要涉及到端口设置,ActiveMQ使用特定端口进行管理和消息传输。管理控制台默认端口是8161,消息通讯端口默认是61616,这些可以在ActiveMQ安装目录下的jetty.xml文件中进行修改。如果需要在防火墙中打开这些端口...

    ActiveMQ实现Android端的消息推送,包含Android端和Server端的代码和使用说明

    Server端的简要使用说明如下: 1、解压安装文件,执行\apache-activemq-5.13.3-bin\apache-activemq-5.13.3\bin\win64\wrapper.exe. 2、测试发送消息,打开本地服务器地址http://localhost:8161,登录服务器,默认...

    qt activemq mqtt 动态库

    在本案例中,"qt activemq mqtt 动态库"指的是使用Qt库编译的ActiveMQ MQTT客户端的库文件(lib)和动态链接库文件(dll)。 这些库文件和dll是为开发REST协议客户端软件而准备的。REST(Representational State ...

    ActiveMQ开发规范及方案

    在使用ActiveMQ之前,需要定义好规则,例如连接池的使用、消费者监控等。连接池的使用可以提高系统的性能和可靠性,而消费者监控可以确保消息的消费正确性。 连接池使用 连接池是指在应用程序中预先创建的一组连接...

Global site tag (gtag.js) - Google Analytics