`
Javahuhui
  • 浏览: 80570 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

ActiveMQ集成Spring使用

阅读更多

ActiveMQ的安装(忽略),主要说明其集成Spring配置使用(点对点模式)。

一、创建Mavne项目,所需ActiveMQ依赖包:

<dependency>

   <groupId>org.apache.activemq</groupId>

   <artifactId>activemq-client</artifactId>

   <version>5.14.5</version>

</dependency>

<dependency>

   <groupId>org.apache.activemq</groupId>

   <artifactId>activemq-pool</artifactId>

   <version>5.14.5</version>

</dependency>

<dependency>

   <groupId>org.springframework</groupId>

   <artifactId>spring-jms</artifactId>

   <version>4.1.9.RELEASE</version>

</dependency>

忽略Spring的集成

二、配置xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:p="http://www.springframework.org/schema/p"  
       xmlns:cache="http://www.springframework.org/schema/cache"
       xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
            http://www.springframework.org/schema/tx 
            http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
            http://www.springframework.org/schema/aop 
            http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
            http://www.springframework.org/schema/context      
            http://www.springframework.org/schema/context/spring-context-4.1.xsd 
            http://www.springframework.org/schema/cache 
            http://www.springframework.org/schema/cache/spring-cache-4.1.xsd">
    
    <context:annotation-config />
    <context:component-scan base-package="com.test.mq" />
        
    <bean id="amqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    	<property name="brokerURL" value="tcp://127.0.0.1:61616" />
    </bean>
    
    <!-- 配置JMS连接工长 -->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>
    
    <!-- 定义消息队列(Queue) -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 设置消息队列的名字 -->
        <constructor-arg>
            <value>mq.demo</value>
        </constructor-arg>
    </bean>
    
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="demoQueueDestination" />
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,默认是false,此处显示写出false -->
        <property name="pubSubDomain" value="false" />
    </bean>
    
    <bean id="queueMessageListener" class="com.test.mq.ActiveMqListener" />
    <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
    <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueDestination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>
    
 </beans>

 三、Java代码

package com.test.mq;

import javax.annotation.Resource;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;

import org.apache.log4j.Logger;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

@Service
public class ActiveMqSender {
	private static Logger logger = Logger.getLogger(ActiveMqSender.class.getName());

	@Resource
	private JmsTemplate jmsTemplate;

	/**
	 * 向指定队列发送消息
	 */
	public void sendMessage(Destination destination, final String msg) {
		jmsTemplate.send(destination, new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(msg);
			}
		});
		logger.info("向队列" + destination.toString() + "发送了消息------------" + msg);
	}

	/**
	 * 向默认队列发送消息
	 */
	public void sendMessage(final String msg) {
		String destination = jmsTemplate.getDefaultDestination().toString();
		jmsTemplate.send(new MessageCreator() {
			public Message createMessage(Session session) throws JMSException {
				return session.createTextMessage(msg);
			}
		});
		logger.info("向队列" + destination + "发送了消息------------" + msg);
	}

}

 

package com.test.mq;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.log4j.Logger;

public class ActiveMqListener implements MessageListener {
	private static Logger logger = Logger.getLogger(ActiveMqListener.class.getName());

	@Override
	public void onMessage(Message message) {
		TextMessage tm = (TextMessage) message;
		try {
			logger.info("ActiveMqListener监听到了文本消息:\t" + tm.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}

}

 四、测试

@Controller
@RequestMapping("/common")
public class CommonController {
	@Resource
	private ActiveMqSender activeMqSender;
	
	@RequestMapping(value = "testMq")
	@ResponseBody
	public String testMq(int len) {
		for(int i=0;i<len;i++){
			activeMqSender.sendMessage("message content "+i);
		}
		return "ok";
	}
}

 启动项目,启动MQ服务,访问测试方法,在MQ中可查看队列处理情况:

 

补充:

Java消息服务(Java Message Service,JMS)规范目前支持两种消息模型:点对点(point to point, queue)和发布/订阅(publish/subscribe,topic)。

区别:点对点模式不可重复消费;发布/订阅模式可以重复消费。

传统企业型消息队列ActiveMQ遵循了JMS规范,实现了点对点和发布订阅模型,但其他流行的消息队列RabbitMQ(消费模式为推push)、Kafka(消费模式为拉pull)并没有遵循JMS规范。

 

ActiveMQ Prefetch Limit

Default Prefetch Limit(默认预取限制):不同的消费者类型有不同的默认设置,具体设置如下:

Queue consumer:默认1000

如果你使用一组消费者进行分散工作量的话(一个Queue对应多个消费者),典型的你应该把数字设置的小一些。如果一个消费者被允许可以聚集大量的未被确认的消息的话,会导致其它的消费者无事可做。同时,如果这个消费者出错的话,会导致大量的消息不能被处理,直到消费者恢复之前。

Queue browser:默认500

Topic consumer:默认32766

默认值32766是数字short的最大值,也是预取限制的最大值。

Durable topic subscriber:默认100

 

Per broker:你可以设置连接Broker的所有消费者的预取限制,通过设置borker的目标策略。设置目标策略需要在broker中增加子条目 destinationPolicy 。参考如下:

<broker ... >

  ... 

  <destinationPolicy> 

    <policyMap> 

      <policyEntries> 

        <policyEntry queue="queue.>" queuePrefetch=”1”/> 

        <policyEntry topic="topic.>" topicPrefetch=”1000”/> 

      </policyEntries> 

    </policyMap> 

  </destinationPolicy> 

  ... 

</broker>

在前面的例子中,所有开头以 queue命名的queue的预取限制设置为1.(>是一个通配符,用于匹配一个或者多个命名段)。所有开头以 topic命名topic的预取限制设置为1000.

消费模式:prefetch预取,即push,当prefetch设置为0,表示消费者主动pull消息。

  • 大小: 37.9 KB
分享到:
评论

相关推荐

    activemq5.5.1 Spring模板

    本文将深入探讨ActiveMQ 5.5.1版本与Spring框架的集成,以及如何利用Spring的模板模式简化ActiveMQ的使用。 一、ActiveMQ基础介绍 ActiveMQ是Apache软件基金会下的一个项目,遵循JMS(Java消息服务)规范,支持多种...

    activemq与spring整合源代码

    Spring还提供了丰富的模块,如数据访问、Web、测试等,其中Spring JMS模块专门用于集成消息中间件,使得与ActiveMQ的整合变得简单。 三、ActiveMQ与Spring的整合 1. 添加依赖:首先,在项目中引入ActiveMQ和Spring...

    ActiveMQ与spring集成实例之使用Maven构建

    标题中的“ActiveMQ与Spring集成实例之使用Maven构建”是指在Java开发环境中,通过Maven构建工具将Apache ActiveMQ消息中间件与Spring框架整合在一起的实际操作案例。这个主题涵盖了几大关键知识点: 1. **Apache ...

    activemq与spring的整合案例

    本案例主要展示了如何在Spring应用中集成ActiveMQ,实现消息的发送和接收。首先,我们需要在项目中引入ActiveMQ的相关依赖。在Maven工程中,可以在pom.xml文件中添加以下依赖: ```xml &lt;groupId&gt;org.apache....

    activemq-spring-5.5.0.jar.zip

    1. `activemq-spring`模块:这个库包含了Spring对ActiveMQ的集成支持,使得开发者可以方便地在Spring应用上下文中配置ActiveMQ连接。 2. 配置方式:通过XML配置或Java配置,可以声明式地创建消息生产者...

    ActiveMQ整合spring的Demo

    ActiveMQ整合Spring的Demo是一个典型的Java企业级应用示例,它展示了如何在Spring框架中集成Apache ActiveMQ,以便实现消息队列的功能。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的Java消息服务...

    activeMQ_spring简单案例(含XML配置)

    总的来说,集成ActiveMQ和Spring可以简化消息传递的复杂性,提高应用程序的可扩展性和解耦性。通过XML配置,我们可以灵活地管理消息生产和消费,实现异步通信,提高系统的响应速度和并发处理能力。在实际项目中,...

    ActiveMQ与Spring线程池整合实例

    ActiveMQ与Spring线程池整合的一个实例。 lib库没有上传。 对于实例的讲解,在竹子的论坛有我对这个实例的帖子(http://www.java2000.net/viewthread.jsp?tid=1167) lib中包含: apache-activemq-4.1.1.jar ...

    ActiveMQ与spring集成实例之使用消息监听器

    集成ActiveMQ和Spring的核心在于使用Spring的`JmsTemplate`以及消息监听器。本文将深入探讨如何在Spring环境中配置并使用ActiveMQ,以及如何通过消息监听器来接收和处理消息。 ### 1. 安装与配置ActiveMQ 首先,你...

    ActiveMQ与spring集成实例之使用消息转换器

    **ActiveMQ与Spring集成实例——使用消息转换器** 在企业级应用开发中,消息队列(Message Queue,MQ)作为一种解耦和异步处理的重要工具,被广泛应用。Apache ActiveMQ 是一个开源的消息中间件,它支持多种消息...

    消息队列:ActiveMQ:ActiveMQ的Spring集成.docx

    消息队列:ActiveMQ:ActiveMQ的Spring集成.docx

    ActiveMQ+spring配置方案详解

    除了配置文件,你还可以使用Spring Boot简化配置,通过注解的方式快速集成ActiveMQ。例如,你可以创建一个配置类,并使用`@EnableJms`注解开启JMS支持: ```java @Configuration @EnableJms public class JmsConfig...

    activemq整合spring

    标题中的“activemq整合spring”指的是在Java环境中,如何将Apache ActiveMQ,一个流行的开源消息代理和消息中间件,与Spring框架集成,以便利用Spring的便利性来管理ActiveMQ的配置和操作。ActiveMQ提供了发布/订阅...

    activemq-spring-1.2.jar.zip

    ActiveMQ-Spring项目是将ActiveMQ集成到Spring应用程序中的桥梁,它使得在Spring应用中配置和使用ActiveMQ变得简单易行。这个`activemq-spring-1.2.jar`库正是这个集成的实现,它包含了一系列的Spring Bean定义和...

    activeMQ和Spring集成Demo

    ActiveMQ和Spring的集成是企业级Java应用中常见的消息中间件解决方案,用于实现解耦、异步处理和系统间的通信。在本示例中,我们将深入探讨如何将这两个组件结合在一起,以创建一个高效且可扩展的应用架构。 首先,...

    ActiveMQ整合Spring(多消费者)

    将ActiveMQ与Spring整合,可以方便地在Spring应用中使用消息队列。 **ActiveMQ整合Spring的核心知识点:** 1. **Spring JMS支持**: Spring提供了JMS模块,通过`org.springframework.jms`包中的类和接口,简化了...

    ActiveMQ与spring集成实例

    接下来,我们将讨论Spring与ActiveMQ集成的关键步骤: 1. **引入依赖**:在Spring项目中,首先需要添加ActiveMQ的依赖库。这通常通过Maven或Gradle的配置文件来完成,例如在Maven的pom.xml中添加如下依赖: ```xml...

    ActiveMQ与Spring整合示例Demo

    **二、Spring与ActiveMQ集成** 1. **引入依赖** 在Spring项目中,我们需要添加ActiveMQ的相关依赖,如`spring-jms`和`activemq-client`。在`pom.xml`或`build.gradle`文件中添加对应的Maven或Gradle依赖。 2. **...

    ActiveMQ与Spring联合使用Demo

    Spring使用模板方法模式,如`JmsTemplate`,提供了发送和接收消息的简便方式。此外,Spring还支持基于注解的消息监听器,只需在方法上添加`@JmsListener`,即可自动监听特定的队列或主题。 在将ActiveMQ与Spring...

    activeMQ_spring_Demo.zip_DEMO_activemq_activemq spring_rowbv3

    《ActiveMQ与Spring整合实战教程...通过以上的讲解,你应该对如何在Spring应用中整合并使用ActiveMQ有了清晰的理解。在实际开发中,不断探索和实践,才能更好地发挥这一强大组合的优势,构建出高效、稳定的企业级应用。

Global site tag (gtag.js) - Google Analytics