`

ActiveMQ的简单使用

阅读更多

ActiveMQ的简单使用

ActiveMQ是一种开源的,实现了JMS规范的,面向消息(MOM)的中间件,为应用程序提供高效的、可扩展的、稳定的和安全的企业级消息通信。

相关文章:
范例项目: http://wosyingjun.iteye.com/blog/2312553 
ActiveMQ集群高可用方案:http://wosyingjun.iteye.com/blog/2314683

 
ActiveMQ组成:

 
ActiveMQ接发送消息流程图:

一. ActiveMQ的安装和配置

1、官网下载Linux版的ActiveMQ(最新版本为5.13.4)
https://activemq.apache.org/download.html
2、解压安装
tar -zxvf apache-activemq-5.13.4-bin.tar.gz
3、配置(这里采用默认配置,无需修改)
vim /usr/lical/activemq-1/conf/activemq.xml
4、启动
cd /usr/local/activemq-1/bin
./activemq start
5、打开管理界面(管理界面可以查看并管理所有队列及消息)
http://192.168.1.100:8161

二. Spring结合ActiveMQ使用

 1、pom文件引入依赖
		<!--active mq start-->
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-core</artifactId>
			<version>5.7.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-pool</artifactId>
			<version>5.13.3</version>
		</dependency>
		<!--active mq end-->
 
2、spring-mq配置文件
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans.xsd">

    <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
    <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
        <!-- ActiveMQ服务地址 -->
        <property name="brokerURL" value="${mq.brokerURL}"/>
        <property name="userName" value="${mq.userName}"></property>
        <property name="password" value="${mq.password}"></property>
        <!-- 这里定义重试策略,注意:只有持久化的才会重试-->
        <property name="redeliveryPolicyMap" ref="redeliveryPolicyMap"/>
    </bean>


    <!--这里设置各个消息队列的重发机制-->
    <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">
        <property name="redeliveryPolicyEntries">
            <list>
                <ref bean="bizRedeliveryPolicy"/>
                <ref bean="mailRedeliveryPolicy"/>
            </list>
        </property>
    </bean>
    <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)-->
        <property name="maximumRedeliveries" value="3"/>
        <property name="redeliveryDelay" value="1000"/>
        <property name="backOffMultiplier" value="2"/>
        <property name="useExponentialBackOff" value="true"/>
        <property name="destination" ref="bizQueue"/>
    </bean>
    <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--重发次数 延时、延时系数、延时指数开关-->
        <property name="maximumRedeliveries" value="2"/>
        <property name="redeliveryDelay" value="5000"/>
        <property name="destination" ref="mailQueue"/>
    </bean>

    <!--
    	ActiveMQ为我们提供了一个PooledConnectionFactory,通过往里面注入一个ActiveMQConnectionFactory
    	可以用来将Connection、Session和MessageProducer池化,这样可以大大的减少我们的资源消耗。
    	要依赖于 activemq-pool包
     -->
    <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
        <property name="connectionFactory" ref="targetConnectionFactory"/>
        <property name="maxConnections" value="${mq.pool.maxConnections}"/>
    </bean>

    <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
        <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
        <property name="targetConnectionFactory" ref="pooledConnectionFactory"/>
        <property name="reconnectOnException" value="true"/>
    </bean>

    <!-- 队列目的地-->
    <bean id="bizQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${biz.queueName}"/>
    </bean>
    <bean id="mailQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="${mail.queueName}"/>
    </bean>


    <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
    <!-- 队列模板 这里配置2个,一个用于分布式业务,一个用于发送邮件-->
    <bean id="bizMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="bizQueue"/>
        <!-- 使 deliveryMode, priority, timeToLive设置生效-->
        <property name="explicitQosEnabled" value="true" />
        <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失-->
        <property name="deliveryPersistent" value="true"/>
        <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
        <property name="sessionTransacted" value="false"/>
    </bean>

    <bean id="mailMqJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="defaultDestination" ref="mailQueue"/>
        <!-- 使 deliveryMode, priority, timeToLive设置生效-->
        <property name="explicitQosEnabled" value="true" />
        <!-- 持久化 如果设置为非持久化MQ服务器重启后MQ中的数据会丢失-->
        <property name="deliveryPersistent" value="true"/>
        <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
        <property name="sessionTransacted" value="true"/>
    </bean>

    <!-- 消息监听实现方法一 -->
    <bean id="bizListener" class="com.yingjun.ssm.mq.listener.TransactionBizMessageListener"/>
    <bean id="mailListener" class="com.yingjun.ssm.mq.listener.MailMessageListener"/>


    <!-- 消息接收监听器用于异步接收消息-->
    <bean id="bizContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="bizQueue"/>
        <property name="messageListener" ref="bizListener"/>
        <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
        <property name="sessionTransacted" value="true"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

    <bean id="mailContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="destination" ref="mailQueue"/>
        <property name="messageListener" ref="mailListener"/>
        <!--这里注意:如果不开启事务,消息在异常的情况下是不会重试的-->
        <property name="sessionTransacted" value="true"/>
        <property name="concurrentConsumers" value="1"/>
    </bean>

</beans>
 
3、重试机制以及死信的配置
<!--这里设置各个消息队列的重发机制-->
    <bean id="redeliveryPolicyMap" class="org.apache.activemq.broker.region.policy.RedeliveryPolicyMap">
        <property name="redeliveryPolicyEntries">
            <list>
                <ref bean="bizRedeliveryPolicy"/>
                <ref bean="mailRedeliveryPolicy"/>
            </list>
        </property>
    </bean>
    <bean id="bizRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--重发次数 延时、延时系数、延时指数开关、目标(重发等待时间1s, 2s, 4s, 8s)-->
        <property name="maximumRedeliveries" value="3"/>
        <property name="redeliveryDelay" value="1000"/>
        <property name="backOffMultiplier" value="2"/>
        <property name="useExponentialBackOff" value="true"/>
        <property name="destination" ref="bizQueue"/>
    </bean>
    <bean id="mailRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--重发次数 延时、延时系数、延时指数开关-->
        <property name="maximumRedeliveries" value="2"/>
        <property name="redeliveryDelay" value="5000"/>
        <property name="destination" ref="mailQueue"/>
    </bean>
 
4、发送端代码
package com.yingjun.ssm.biz;

import com.alibaba.fastjson.JSONObject;
import com.yingjun.ssm.common.model.BizOperator;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * @author yingjun
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:application.xml")
public class Application {

    private final Logger log = LoggerFactory.getLogger(Application.class);

    @Autowired
    private JmsTemplate bizMqJmsTemplate;

    @Test
    public void mailSend() throws Exception {
        bizMqJmsTemplate.setSessionTransacted(true);
        for (int i = 0; i < 1; i++) {
            log.info("==>send message" + i);
            bizMqJmsTemplate.send(new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    log.info("getTransacted:" + session.getTransacted());
                    BizOperator operator = new BizOperator("testDistributedTransaction", 1001);
                    return session.createTextMessage(JSONObject.toJSONString(operator));
                }
            });
            log.info("==>finish send message"+ i);
        }
        while (true) {

        }
    }
}
 
5、接受端代码
package com.yingjun.ssm.mq.listener;

import com.alibaba.fastjson.JSONObject;
import com.yingjun.ssm.common.model.BizOperator;
import com.yingjun.ssm.mq.biz.TransactionBizService;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.listener.SessionAwareMessageListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

/**
 * 
 * @author yingjun
 */
@Component
public class TransactionBizMessageListener implements SessionAwareMessageListener<Message> {

    private static final Logger log = LoggerFactory.getLogger(TransactionBizMessageListener.class);
    private final String transactionBiz = "testDistributedTransaction";

    @Autowired
    private TransactionBizService transactionBizService;

    /**
     * @param message
     * @param session
     */
    public void onMessage(Message message, Session session) throws JMSException{
        //这里建议不要try catch,让异常抛出,通过redeliveryPolicy去重试,达到重试次数进入死信DLQ(Dead Letter Queue)
        ActiveMQTextMessage msg = (ActiveMQTextMessage) message;
        String ms = ms = msg.getText();
        log.info("==>receive message:" + ms);
        // 转换成相应的对象
        BizOperator operator = JSONObject.parseObject(ms, BizOperator.class);
        if (operator != null && transactionBiz.equals(operator.getOperator())) {
            transactionBizService.addScoreBySyn(100);
            //throw new RuntimeException("test redeliveryPolicy");
        } else {
            log.info("==>message:" + ms + " no about operator!");
        }
    }
}

 

如上所以就完成了Spring结合ActiveMQ的简单实现,完整代码可在文章最上头的范例项目中找到。

4
7
分享到:
评论

相关推荐

    ActiveMQ使用手册(中文版)

    ### ActiveMQ 使用手册知识点概述 #### 一、ActiveMQ 原理与基本构件 **1.1 连接工厂(Connection Factory):** - **定义:** 连接工厂是客户端用来创建连接的对象。在ActiveMQ中,`ActiveMQConnectionFactory` 类...

    activeMQ简单入门案例

    本教程将引导你通过一个简单的入门案例了解如何使用ActiveMQ实现生产者与消费者的模式。 首先,我们需要了解ActiveMQ的基本概念。在消息队列中,生产者是发送消息的实体,而消费者则是接收和处理这些消息的实体。...

    ActiveMQ简单Demo案例

    在这个"ActiveMQ简单Demo案例"中,我们将探讨如何使用ActiveMQ搭建服务器,并创建生成者(Producer)和消费者(Consumer)对象。 首先,我们需要理解JMS的概念。JMS是一个标准,定义了与消息传递系统交互的API,...

    ActiveMQ使用SSL加密文件Demo

    **ActiveMQ 使用 SSL 加密文件 Demo** ActiveMQ 是一个开源的消息代理服务器,它遵循 Java Message Service(JMS)规范,提供了可靠的消息传递功能。在实际的生产环境中,为了确保消息传输的安全性,我们通常会使用...

    activeMq in action 使用activeMq开发JMS的简单讲述

    本篇文章将深入探讨如何使用ActiveMQ进行JMS开发,以及ActiveMQ的核心特性。 一、Java消息服务(JMS) JMS是一种为分布式环境设计的消息传递规范,它定义了生产、存储和消费消息的标准接口。通过JMS,应用程序可以...

    ActiveMQ简单教程

    在 Windows 上安装 ActiveMQ 很简单,只需下载解压即可。最新版本为 5.11.1,可以从 Apache 官网获取。在 Windows 上,如果已经配置了全局的 Java 环境变量,可以直接运行;否则,需要在 `bin\win64\wrapper.conf` ...

    ActiveMQ使用入门.pdf

    【ActiveMQ使用入门】 ActiveMQ是一款基于Java的消息中间件,它是Apache基金会的开源项目,也是最早的JMS(Java消息服务)实现之一。JMS是一种标准,定义了在Java环境中访问消息中间件的接口,但并未具体实现。...

    ActiveMQ_使用failover模式进行连接切换时,线程断开

    在使用ActiveMQ消息中间件时,我们常常需要考虑如何在多个实例之间实现高可用性(HA)。其中一种常用的方式是采用**Failover(故障转移)**模式来确保即使一个节点出现问题,另一个节点也能接管服务,从而维持系统的...

    ActiveMQ安装和使用

    在`conf/activemq.xml`文件中添加简单的认证插件配置: ```xml ,admins"/&gt; ``` 这里定义了一个名为`wusc`的用户,密码为`wusc.123`,拥有`users`和`admins`角色。 ##### 2. 控制台登录配置 在`conf/...

    activemq_demo,activeMQ的简单demo

    【描述】中提到的"activeMQ的简单demo"是指一个基础的示例项目,旨在帮助开发者快速理解和使用ActiveMQ。通过导入此项目并运行,你可以直观地看到ActiveMQ如何工作,如何发送和接收消息,以及如何配置和管理消息队列...

    JMS 使用 ActiveMQ 传送文件

    XML常用于数据交换,因为它具有自我描述性和平台无关性,与JMS结合使用,可以使得消息的结构化和解析更为简单,从而在异构环境中实现文件的高效传输。 2. **JMS 使用 ActiveMQ 传送文件.doc** - 这个文档应该直接...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    一个activeMQ的简单例子

    本示例将介绍如何使用ActiveMQ实现一个简单的消息队列应用。 首先,我们需要理解消息队列(Message Queue)的概念。消息队列是一种异步通信模式,允许应用程序之间通过消息进行通信,而无需直接调用对方。这种解耦...

    activemq C#客户端使用demo

    在本示例中,我们将专注于讲解如何在C#环境下使用ActiveMQ进行消息发送。 在C#中使用ActiveMQ,你需要引用Apache.NMS和Apache.NMS.ActiveMQ这两个库,它们包含了C#客户端与ActiveMQ服务器交互所需的所有功能。首先...

    ActiveMq-JMS简单实例使用tomcat.pdf

    【ActiveMQ-JMS简单实例使用Tomcat】是一个关于如何在Tomcat环境下集成并使用ActiveMQ进行JMS消息传递的初级教程。ActiveMQ是一款开源的消息中间件,它遵循JMS(Java Message Service)1.1规范,能兼容J2EE1.4及以上...

    activemq的简单配置

    #### 三、ActiveMQ的简单使用示例 ##### 1. 配置与启动 ActiveMQ的安装和配置相对简单,一般包括以下步骤: - 下载并解压ActiveMQ压缩包。 - 修改配置文件`conf/activemq.xml`,设置监听端口、日志级别等。 - 启动...

    activeMQ的使用入门

    Spring框架提供了对JMS的强大支持,使得配置和使用ActiveMQ变得更加简单。你可以通过Spring的XML配置文件或Java配置类来定义ConnectionFactory、Destination(队列或主题)和MessageListener。在Spring的环境下,...

    activeMQ使用软件,以及初始化页面

    安装ActiveMQ非常简单,只需要以下几个步骤: 1. 下载并解压压缩包到你选择的目录。 2. 进入解压后的目录,找到bin目录。 3. 对于Windows用户,运行start.bat启动ActiveMQ;对于Linux或Mac用户,执行bin目录下的...

    ActiveMQ的简单例子

    本教程将通过一个简单的例子介绍ActiveMQ的两个核心模式:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)。我们将使用IntelliJ IDEA作为集成开发环境来实现这些示例。 首先,我们需要...

    mqttjs(activemq测试工具)

    本教程主要围绕`mqttjs`,一个JavaScript实现的MQTT客户端库,以及如何使用它来测试ActiveMQ服务器。`mqttjs`是一个轻量级且易于使用的库,适合在Web应用、Node.js环境中进行 MQTT 相关的开发工作。 首先,安装`...

Global site tag (gtag.js) - Google Analytics