`

ActiveMQ与Spring线程池的整合

    博客分类:
  • jms
 
阅读更多

转自:http://www.g4studio.org/thread-880-1-1.html

ActiveMQ与Spring线程池的整合在企业级开发中,很多系统间的通信,特别是与外部系统间的通信,往往都是异步的,JMS便是J2EE应用程序中用于处理异步消息传递的接口。为了提高对外部消息的相应,服务器程序中往往利用线程技术来处理接收的消息,线程池的意义在于对这样的一个并行处理机制进行性能上的优化。为了迅速切入正体,这里就不多涉及JMS的内容与池的概念。仅对如何进行ActiveMQ与Spring线程池整合做较为详细的描述。 
引用:

ActiveMQ与Spring线程池整合实例 
CSDN资源下载 仅有47k 




整合步骤 

这里我按照配置流程逐点来描述进行配置的方法。其中MQ整合配置的过程中各bean之间的关系比较多,也比较晕,我用橙红色将他们标记出来,关注标记的几点是十分重要的。 


  • 让Spring支持ActiveMQ的配置语法


首先我们需要在applicationContext.xml中引入ActiveMQ的配置语法。

  1. <beans xmlns="http://www.springframework.org/schema/beans"  
  2.             xmlns:amq="http://activemq.org/config/1.0"  
  3.             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  4.             xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
  5.               http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">  
复制代码


  1. <beans xmlns="http://www.springframework.org/schema/beans"        xmlns:amq="http://activemq.org/config/1.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd          http://activemq.org/config/1.0 http://people.apache.org/reposit ... ubator-SNAPSHOT.xsd">
复制代码




由于是SnapShot版本,那个XSD有部分错误,我们这里使用的是自行修改过的XSD,之后将activemq-core-4.1-incubator-SNAPSHOT.xsd文件放入META-INF文件夹下。并且新建spring的自定义scheam的配置文件spring.schemas:

  1. http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd=/activemq-core-4.1-incubator-SNAPSHOT.xsd  
复制代码



http\://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd=/activemq-core-4.1-incubator-SNAPSHOT.xsd这样在应用程序上下文中就可以解析ActiveMQ的配置语法了。 
注:实例中是3个文件的配置方法,只不过这3个文件是由activemq-core-4.1-incubator-SNAPSHOT.xsd分解开来的,spring.schemas中也需要将这3个文件都配置进去。 


  • 配置ActiveMQ基础部件


为了使ActiveMQ能够正常运行起来,我们需要在ApplicationContext.xml中进行配置。

  1. //配置ActiveMQ Broker   
  2. <amq:broker useJmx="false" persistent="false">   
  3.         <amq:destinations>   
  4.             <amq:queue id="jms.log" physicalName="pams.amq" />   
  5.         </amq:destinations>   
  6.         <amq:transportConnectors>   
  7.             <amq:transportConnector uri="tcp://localhost:61616" />   
  8.         </amq:transportConnectors>   
  9.     </amq:broker>   
  10. //配置ConnectionFactory   
  11.     <amq:connectionFactory id="jmsConnectionFactory"  
  12.         brokerURL="vm://localhost:61616" />   
  13. //配置Queue   
  14.     <amq:queue name="destination" physicalName="pams.amq" />
复制代码



//配置ActiveMQ Broker<amq:broker useJmx="false" persistent="false">                <amq:destinations>                        <amq:queue id="jms.log" physicalName="pams.amq" />                </amq:destinations>                <amq:transportConnectors>                        <amq:transportConnector uri="tcp://localhost:61616" />                </amq:transportConnectors>        </amq:broker>//配置ConnectionFactory        <amq:connectionFactory id="jmsConnectionFactory"                brokerURL="vm://localhost:61616" />//配置Queue        <amq:queue name="destination" physicalName="pams.amq" />在这里使用了内嵌JVM这种最简单的模式,这样在Spring初始化时ActiveMQ便加载了。 
对于ActiveMQ的配置,我们还需要配置消息生产者、消息消费者和消息转换者等等部分。不过,在此之前我们需要先将线程池配置进来,之后再进行ActiveMQ余下的配置。 


  • 配置Spring线程池



对于线程池的配置,我们需要配置一个线程池执行器。首先看下配置代码:

  1. <!-- ThreadPool Executor -->   
  2.         <bean id="threadPoolExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">   
  3.             <!-- <property name="threadFactory" ref="threadFactory" /> -->   
  4.             <property name="corePoolSize" value="2" />   
  5.             <property name="maxPoolSize" value="4"/>   
  6.             <property name="queueCapacity" value="500"/>   
  7.             <property name="keepAliveSeconds" value="300"/>   
  8.             <FONT color=red>
  9. <property name="rejectedExecutionHandler">   
  10.                 <bean class="java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy"/>   
  11.             </property>
  12. </FONT>    
  13. </bean>  
复制代码





<!-- ThreadPool Executor -->    <bean id="threadPoolExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">        <!-- <property name="threadFactory" ref="threadFactory" /> -->        <property name="corePoolSize" value="2" />        <property name="maxPoolSize" value="4"/>        <property name="queueCapacity" value="500"/>        <property name="keepAliveSeconds" value="300"/>        <property name="rejectedExecutionHandler">                <bean class="java.util.concurrent.ThreadPoolExecutor$DiscardOldestPolicy"/>        </property>    </bean>这里同样仅做了一个简单的配置,需要注意的是对rejectedExecutionHandler属性的配置,这里使用了JDK1.5中用于被拒绝任务的处理机制,在JDK1.5中有4种处理机制,都可以在这里以这种方式配置使用。 


  • 消息转换器的配置


完成线程池执行器的配置后,我们需要配置一个消息转换器(Converter),这样在生产和消费时可以直接发送Order对象,而不是JMS的Message对象。

  1. //消息转换器   
  2.     <bean id="resourceMessageConverter" class="com.jmspool.jms.ResourceMessageConverter" />  
复制代码





//消息转换器<bean id="resourceMessageConverter" class="com.jmspool.jms.ResourceMessageConverter" />


  • jmsTemplate的配置


之后我们需要配置一个由Spring提供的Template,其绑定ConnectionFactory与消息转换器(Converter)。这样在进行消息的生产时我们可以直接使用Spring提供的Template中的方法。

  1. <!-- JMS ActiveMQ Config-->   
  2.         <bean id="jmsTemplate"  
  3.             class="org.springframework.jms.core.JmsTemplate">   
  4.             //连接工厂   
  5.                       <property name="connectionFactory">   
  6.                 <bean   
  7.                     class="org.springframework.jms.connection.SingleConnectionFactory">   
  8.                     <property name="targetConnectionFactory"  
  9.                         ref="jmsConnectionFactory" />   
  10.                 </bean>   
  11.             </property>   
  12.                       //消息转换器   
  13.             <property name="messageConverter"  
  14.                 ref="resourceMessageConverter" />   
  15.         </bean>  
复制代码





<!-- JMS ActiveMQ Config-->    <bean id="jmsTemplate"                class="org.springframework.jms.core.JmsTemplate">                //连接工厂                  <property name="connectionFactory">                        <bean                                class="org.springframework.jms.connection.SingleConnectionFactory">                                <property name="targetConnectionFactory"                                        ref="jmsConnectionFactory" />                        </bean>                </property>                  //消息转换器                <property name="messageConverter"                        ref="resourceMessageConverter" />        </bean>


  • 消息生产者


消息生产者用于生产消息,也就是将消息发送出去。在这里我们使用上面配置好的JmsTemplate来发送发送消息。这个类的代码我就不贴出来了,大家可以在实例中查看。配置消息生产者需要JmsTemplate与消息队列(Queue)。

  1. <bean id="resourceMessageProducer"  
  2.             class="com.jmspool.jms.ResourceMessageProducer" >   
  3.             <property name="template">   
  4.                 <ref bean="jmsTemplate"></ref>   
  5.             </property>   
  6.             <property name="destination">   
  7.                 <ref bean="destination"></ref>   
  8.             </property>   
  9.         </bean>  
复制代码





<bean id="resourceMessageProducer"            class="com.jmspool.jms.ResourceMessageProducer" >            <property name="template">                    <ref bean="jmsTemplate"></ref>            </property>            <property name="destination">                    <ref bean="destination"></ref>            </property>    </bean>


  • 消息接收处理者


消息接收者(MDP)使用Spring的MessageListenerAdapter,指定负责处理消息的POJO及其方法名,绑定消息转换器Converter

  1. <bean id="resourceMessageListener"  
  2.             class="org.springframework.jms.listener.adapter.MessageListenerAdapter">   
  3.             <constructor-arg>   
  4.                 <ref bean="resourceMessageConsumer"></ref>   
  5.             </constructor-arg>   
  6.             <property name="defaultListenerMethod" value="addResource" />   
  7.             <property name="messageConverter"   ref="resourceMessageConverter" />   
  8.         </bean>   
复制代码





<bean id="resourceMessageListener"                class="org.springframework.jms.listener.adapter.MessageListenerAdapter">                <constructor-arg>                        <ref bean="resourceMessageConsumer"></ref>                </constructor-arg>                <property name="defaultListenerMethod" value="addResource" />                <property name="messageConverter"   ref="resourceMessageConverter" />        </bean>


  • 监听容器的配置


监听容器的任务是负责调度MDP, 绑定connectionFactory,Queue和MDP。更重要的一点,这里也是将线程执行器与ActiveMQ整合的切入点。先看下配置文件:

  1. <bean id="resourcelistenerContainer"    
  2.             class="org.springframework.jms.listener.SimpleMessageListenerContainer">    
  3.             <property name="connectionFactory" ref="jmsConnectionFactory" />    
  4.             <property name="autoStartup" value="true"/>    
  5.             <property name="concurrentConsumers" value="6"/>    
  6.             <property name="destination" ref="destination"/>    
  7.             <property name="messageListener" ref="resourceMessageListener"/>    
  8.             <property name="sessionTransacted" value="true"/>    
  9.             <FONT color=red><property name="taskExecutor" ref="threadPoolExecutor"/> </FONT>   
  10.         </bean>  
复制代码





<bean id="resourcelistenerContainer"                 class="org.springframework.jms.listener.SimpleMessageListenerContainer">                 <property name="connectionFactory" ref="jmsConnectionFactory" />                 <property name="autoStartup" value="true"/>                 <property name="concurrentConsumers" value="6"/>                 <property name="destination" ref="destination"/>                 <property name="messageListener" ref="resourceMessageListener"/>                 <property name="sessionTransacted" value="true"/>                 <property name="taskExecutor" ref="threadPoolExecutor"/>         </bean>这里通过taskExecutor属性将已经配置好的线程执行器threadPoolExecutor与JMS(ActiveMQ)整合起来。 


  • 消息消费者的配置


消息消费者是消息的最终处理部分,开始文章里面说说的整合线程的目的,也就是为了使得处理者能够并行的处理接收到的消息。同样这里为了简洁也不将代码贴出来。配置时需要绑定线程池执行器

  1. <bean id="resourceMessageConsumer"  
  2.             class="com.jmspool.jms.ResourceMessageConsumer" abstract="false"  
  3.             lazy-init="default" autowire="default" dependency-check="default">   
  4.             <property name="threadPoolExecutor">   
  5.                 <ref bean="threadPoolExecutor"></ref>   
  6.             </property>   
  7.         </bean>  
复制代码





<bean id="resourceMessageConsumer"                class="com.jmspool.jms.ResourceMessageConsumer" abstract="false"                lazy-init="default" autowire="default" dependency-check="default">                <property name="threadPoolExecutor">                        <ref bean="threadPoolExecutor"></ref>                </property>        </bean>至此,我们的整合工作就完成了。在上述文中仅仅简单的描述了配置的步骤,对于原理及各类的属性意义并没有详细的描述。因为网上有不少这样的文章。 

写在最后: 
我们配置的所有步骤其实都是围绕着Spring展开的,虽然我们完成的是一个ActiveMQ与Spring线程池的整合实例,但是如果换做其他MQ中间件,此法同样是适用的。因为我们整合的层面是在Spring上。

分享到:
评论

相关推荐

    ActiveMQ与Spring线程池整合实例

    ActiveMQ与Spring线程池整合的一个实例。 lib库没有上传。 对于实例的讲解,在竹子的论坛有我对这个实例的帖子(http://www.java2000.net/viewthread.jsp?tid=1167) lib中包含: apache-activemq-4.1.1.jar ...

    activemq 配置说明与activemq入门讲解

    2. **JMS编程**:使用JMS API与ActiveMQ交互,创建ConnectionFactory,然后创建Connection,Session,Destination(Queue或Topic),最后创建MessageProducer和MessageConsumer。 3. **Web控制台**:ActiveMQ内置了...

    activemq整合spring

    标题中的“activemq整合spring”指的是在Java环境中,如何将Apache ActiveMQ,一个流行的开源消息代理和消息中间件,与Spring框架集成,以便利用Spring的便利性来管理ActiveMQ的配置和操作。ActiveMQ提供了发布/订阅...

    activemq-spring-1.5.jar.zip

    《ActiveMQ与Spring整合——深度解析activemq-spring-1.5.jar.zip》 在IT行业中,消息中间件扮演着至关重要的角色,它能够帮助系统实现解耦、异步处理以及提高系统的可扩展性。Apache ActiveMQ是开源社区中最活跃的...

    浅谈spring 线程异步执行

    主要介绍了浅谈spring 线程异步执行,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧

    浅谈Spring Boot 整合ActiveMQ的过程

    浅谈Spring Boot 整合ActiveMQ的过程 Spring Boot 是一种流行的Java框架,用于快速构建基于Spring的应用程序。ActiveMQ是一个流行的开源消息队列系统,用于在分布式系统中实现消息传递。本文将介绍如何在Spring ...

    JMS整合Spirng

    将JMS与Spring整合,可以充分利用Spring的IoC(Inversion of Control,控制反转)和AOP(Aspect Oriented Programming,面向切面编程)特性,实现消息驱动的解耦和事务管理。 在Spring框架中,JMS的整合主要通过...

    NETTY+ACTIVITYMQ实现高用户并发

    4. **消息确认与重试**:ActiveMQ支持消息确认机制,消费者只有在成功处理消息后才确认,否则消息会被重新投递,保证消息不丢失。 5. **消息顺序保证**:对于需要消息顺序的场景,可以借助消息的优先级或序列号进行...

    spring-boot-demo:springboot 2.1+整合各种技术

    :测试spring初始注解:@Async,覆盖串行线程池自定义线程池,无返回值纯初始化,有返回值Future,有返回值CompletableFuture :整合redis,7大数据类型:string,list,set,zset,hash,geo,hyperloglog :...

    java面试知识点整理.zip

    - Dubbo与Spring的整合使用。 - Dubbo的Remoting层解析。 9. **算法**: - 排序算法:冒泡、选择、插入、快速、归并、堆排序。 - 查找算法:顺序、二分、哈希查找。 - 数据结构:数组、链表、栈、队列、树、图...

    微信支付jsjdk可以直接使用java版

    【微信支付JSJDK与Java版的整合使用】 微信支付JSJDK(JavaScript SDK)是微信支付提供的用于在网页端实现微信支付功能的JavaScript库。它允许开发者在自己的网站上集成微信支付,使得用户可以通过微信钱包进行在线...

    机场管理系统

    6. 并发与多线程:机场系统需要处理大量并发请求,Java的并发库提供了线程池、同步机制等工具,可以有效管理并发任务,提高系统性能。 7. 消息队列:为了处理异步任务和解耦系统组件,机场管理系统可能会使用消息...

    04_体验一下面试官对于消息队列的7个连环炮.zip

    在实际面试中,面试者需要对这些概念有深入理解,并能结合具体的Java技术进行讨论,如Spring Boot整合RabbitMQ或Kafka的实际操作。同时,面试官可能会考察面试者在遇到问题时如何设计解决方案,以及如何在实际项目中...

    The-Hub:只是个枢纽

    通过JMS,它可以与消息中间件如ActiveMQ、RabbitMQ等进行通信,实现异步处理和解耦;通过RESTful API,Java服务可以与其他系统无缝对接,形成一个整体的生态系统。 总结来说,"The-Hub"在这里可能指的是一种基于...

Global site tag (gtag.js) - Google Analytics