`
gong1208
  • 浏览: 559075 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Java 消息服务(JMS)介绍­­­­——使用spring与activemq

    博客分类:
  • jms
阅读更多

 


                   Java 
消息服务(JMS)介绍

                                                                     ­­­­——使用springactivemq

 

第一次写技术文章,所以不懂规矩,写得也没什么条理,主要都是根据自己的一些实际操作经验写的,

以下代码都经过实际验证,其中也有自己不甚明了之处,欢迎大家拍砖或者交流。

 

关于jms,我打算会有一系列的文章,这里只是其中一节,

本节主要介绍如何在spring框架下配置使用jms,jms提供者使用activemq。
该配置相关说明:
1. 消息发送者与接受者分离,分别配置在两个独立的配置文件中
2. 启用一个消息服务器,其中产生一个queue一个topic,两个生产者分别往其中发送对象,发送时采用spring提供的转换器,可以实现java对象与jms消息的相互转化。
3. 消费者总共三个,一个消费queue中的消息,两个消费topic中的消息,我们称之为订阅者,其中一个订阅者是持久性订阅者,我们知道对于普通的订阅者来说,当该订阅者处于非活动期时topic中产生的消息是无法再传送给订阅者的(除非是实体化消息),但是持久化订阅者是可以收到其在非活动期间topic中产生的消息的(从其在服务器上注册时开始至现在)
4. 发送时使用的是同步发送方式,即发送者发送消息到服务器,等待服务器发送确认消息表示发送成功,发送者可以继续发送消息,消费者或者订阅者使用的是监听器方式,所以是采用异步接收方式,即接受者不需要一直阻塞直到接收消息,而是jms服务器有消息到达时会触发消息监听器的一个动作。当我们需要使用同步接收方式时就需要像同步发送消息一样使用jmstemplate。同时这里消息的确认方式采用默认的AUTO_ACKNOWLEDGE方式,即自动确认,即服务器收到消息立马发送确认消息,同样接收者收到消息会立即向服务器发送确认消息。需要注意的是,这里的同步异步,都只是描述客户端与服务器端之间的关系,而不是这发送者-服务器-接受者三者间的关系。


下面我们看具体的配置
activemq-produce.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">

<!-- <context:property-placeholder location="classpath:server-config.properties" ignore-unresolvable="true"></context:property-placeholder>
-->
   <!--  指定发送端连接的activeMQ服务器-->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--  指定发送的目的地的类型和名字-->
    <bean id="MyQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="firstQueue"></constructor-arg>
    </bean>

    <bean id="Topic-A" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="Topic-A"></constructor-arg>
    </bean>

    <!-- converter  -->
    <bean id="defaultMessageConverter" class="com.gy.myactivemq.DefaultMessageConverter"/>
<bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    <!--  Spring JmsTemplate config,即消息发送模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <!--  lets wrap in a pool to avoid creating a connection per send -->
            <!--  单例模式,避免每次发送消息时新产生一个连接 -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory" ref="connectionFactory"/>
            </bean>
        </property>
         <!-- custom MessageConverter -->
        <property name="messageConverter" ref="simpleMessageConverter"/>
    </bean>

   <!-- POJO which send Message uses  Spring JmsTemplate ,使用消息模板发送消息-->
    <bean id="queueMessageProducer" class="com.gy.myactivemq.QueueMessageProducer">
        <property name="template" ref="jmsTemplate"/>
        <property name="destination" ref="MyQueue"/>
    </bean>

    <bean id="topicMessageProducer" class="com.gy.myactivemq.TopicMessageProducer">
        <property name="template" ref="jmsTemplate"/>
        <property name="destination" ref="Topic-A"/>
    </bean>

</beans>
其中com.gy.myactivemq.QueueMessageProducer的java代码如下:
public class QueueMessageProducer {
    private JmsTemplate template;
    private Queue destination;
    public void setTemplate(JmsTemplate template) {
        this.template = template;
    }
    public void setDestination(Queue destination) {
        this.destination = destination;
    }
    public void send(FooMessage message) {
        template.convertAndSend(this.destination, message);
    }
    public void sendByMe(String mess) {
        template.convertAndSend(this.destination, mess);
    }
}
其中包括两个可以用来发送消息的方法sendByMe和send,可以分别用来发送不同类型的消息,如果想要发送其他类型的消息可以自己定于,然后在单元测试中调用方式如下:
@Test
public void sendTest(){
ApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-produce.xml");
QueueMessageProducer qmp=(QueueMessageProducer) ctx.getBean("queueMessageProducer");
TopicMessageProducer tmp=(TopicMessageProducer) ctx.getBean("topicMessageProducer");
FooMessage message=new FooMessage();
message.setId(123);
qmp.send(message);
tmp.send(message);
}

activemq-consumer.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
     http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- <context:property-placeholder location="classpath:server-config.properties" ignore-unresolvable="true"></context:property-placeholder>
-->
    <!-- 用于连接activeMQ服务器 -->
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="clientIDPrefix" value="www"/>
</bean>

<!-- ActiveMQ destinations ,连接的目标名称-->
    <bean id="MyQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="firstQueue"></constructor-arg>
    </bean>

    <bean id="Topic-A" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="Topic-A"></constructor-arg>
    </bean>


    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

<bean id="queueListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="topicListenerB"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerB" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="MyQueue" />
<property name="messageListener" ref="queueListener" />
</bean>
<bean id="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerA" />
<property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_001"/>
        <property name="durableSubscriptionName" value="clientId_001"/>
</bean>

<bean id="topicListenerContainerB"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerB" />
</bean>
<!-- consumer for queue -->
   <bean id="queueConsumer" class="com.gy.myactivemq.QueueConsumer"/>

<!-- consumer for topic -->
   <bean id="topicConsumerA" class="com.gy.myactivemq.TopicConsumerA" />
   <bean id="topicConsumerB" class="com.gy.myactivemq.TopicConsumerB" />

</beans>

需要注意的是这段配置:
<bean id="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerA" />
<property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_001"/>
        <property name="durableSubscriptionName" value="clientId_001"/>
</bean>
其中subscriptionDurable属性表明该处于该container中的订阅者是一个持久订阅者,配置持久订阅者必须指定一个clientId的值,而且这个值对于每一个订阅者都必须是唯一的,因为jms服务器要根据每个订阅者的这个clientId为其注册,这样jms服务器才能确保在该订阅者不活动时为其保存消息。还注意到如下配置:
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="clientIDPrefix" value="www"/>
</bean>
其中<property name="clientIDPrefix" value="www"/>配置指明每个连接到jms服务器的连接的clientId的前缀是www,然后具体这个配置的用处现在还不清楚,之前网上说这个配置表明只有当连接的jms服务器的客户端的clientId名的前缀与这个配置一致时,该连接才能实现持久化订阅,但实际情况是这里不管要不要这个配置,订阅者只要指定任意的clientId值都可以实现持久订阅。

对于这段配置我们可以发现对于每一个listener监听者我们都需要配置一个container容器,这里我们可以通过jms标签简化这个配置,比如若有两个listener,topicListenerA和topicListenerB配置如下:
  <jms:listener-container connection-factory="connectionFactory" concurrency="1" destination-type="topic">
  <jms:listener destination="Topic-A" ref="topicListenerA" />
   <jms:listener destination="Topic-A" ref="topicListenerB" />
  </jms:listener-container>
其中topicListenerA和topicListenerB是在前面的配置文件中已经配好,当然我们也可以直接在<jms:listener destination="Topic-A" ref="topicListenerA" />标签中配置,这样下面这段代码也可以省了
<bean id="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>
具体配置可以参阅《java消息服务》一书。
使用jms标签需要加入如下命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
     http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
总之,这样就省去了一个listener需要一个container的麻烦,如果需要配置持久订阅者则需要如下配置:
   <!-- 弊端:一个listener-container若指定了client-id和destination-type来实现持久化订阅,则只能含有一个listener,因为每个listener的client-id必须唯一 -->
   <jms:listener-container connection-factory="connectionFactory" concurrency="1" client-id="clientId_001" destination-type="durableTopic">
   <jms:listener destination="Topic-A" ref="topicListenerA" />
   </jms:listener-container>
这么做的弊端已经在注释上标明,当然也许不是这样,但是目前个人没有发现如何将持久订阅者和非持久订阅者放在同一个container中。另外destination-type属性用于指定监听的消息提供者类型,默认是queue,所以监听topic时需要指定类型。

开启消息监听的代码很简单:
public static void main(String [] agr){
ApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-consumer.xml");
}
注意这里不使用单元测试的@Test是因为,使用单元测试的方法运行之后马上会停止该方法,导致连接之后马上会断开,消息可能都还没有被监听者收到,所以我们需要保持监听者一直与jms服务器连接。

这里,需要用的jar包有三个必须的,分别是spring-jms,activemq-all,slf4j相关的包(slf4j-simple,slf4j-spi),当然spring读取配置文件的相关包肯定也是需要的,用过spring的应该都知道。

另外,activemq的启动是单独启动的,也可以通过如下方式在spring配置中启动:
<amq:broker useJmx="false" persistent="true">
<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="d:/amq"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61616" />
</amq:transportConnectors>
</amq:broker>
但是不推荐这种方式,建议分开来启动。

 

版权所有,转载请注明来源:

http://gong1208.iteye.com/blog/1558367

分享到:
评论
3 楼 u011710489 2017-02-20  
我的配置是这样的:
<!-- ActiveMQ 连接工厂 -->
<!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
<!-- 如果连接网络:tcp://ip:61616;未连接网络:tcp://localhost:61616 以及用户名,密码-->
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"  />


<!-- Spring Caching连接工厂 -->
<!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> 
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> 
  <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
  <!-- 同上,同理 -->
<!-- <constructor-arg ref="amqConnectionFactory" /> -->
<!-- Session缓存数量 -->
<property name="sessionCacheSize" value="100" />
</bean>


<!-- 消息消费者 start-->

<!-- 定义Queue监听器 -->
<jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
<jms:listener destination="test.queue" ref="queueReceiver1"/>
<jms:listener destination="test.queue" ref="queueReceiver2"/>
</jms:listener-container>

<!-- 定义Topic监听器 -->
<jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory" acknowledge="auto" client-id="topic1">
<jms:listener destination="test.topic" ref="topicReceiver1"/>
</jms:listener-container>
<!--<jms:listener-container destination-type="durableTopic" container-type="default" connection-factory="connectionFactory2" acknowledge="auto" client-id="topic2">
<jms:listener destination="test.topic" ref="topicReceiver2"/>
</jms:listener-container>

--><!-- 消息消费者 end -->
2 楼 u011710489 2017-02-20  
ERROR 8976 --- [erContainer#2-1] o.s.j.l.DefaultMessageListenerContainer  : Could not refresh JMS Connection for destination 'test.topic' - retrying using FixedBackOff{interval=5000, currentAttempts=0, maxAttempts=unlimited}. Cause: setClientID call not supported on proxy for shared Connection. Set the 'clientId' property on the SingleConnectionFactory instead.
1 楼 u011710489 2017-02-20  
你好,为什么我启动报错呢setClientID call not supported on proxy for shared Connection

相关推荐

    Spring 实现远程访问详解——jms和activemq

    JMS的全称是Java Message Service,即Java消息服务。它主要用于在生产者和消费者之间进行消息传递,生产者负责产生消息,而消费者负责接收消息。把它应用到实际的业务需求中的话我们可以在特定的时候利用生产者生成...

    activemq与spring整合发送jms消息入门实例

    在Java世界中,ActiveMQ和Spring的整合是企业级应用中常见的消息中间件解决方案,用于实现JMS(Java Message Service)消息传递。本教程将深入探讨如何将这两个强大的工具结合在一起,以创建一个简单的发送JMS消息的...

    SpringJMS整合ActiveMQ

    详细内容: SpringJMS整合ActiveMQ.doc 详细说明文档 apache-activemq-5.8.0-bin.zip ActiveMQ安装包 JMSTest.rar MyEclipse8.5下web工程

    Spring整合Blazeds实现ActiveMQ JMS消息服务

    标题中的“Spring整合Blazeds实现ActiveMQ JMS消息服务”指的是在Java应用程序中使用Spring框架与Blazeds(一个Flex和Java之间的消息传递中间件)集成,通过ActiveMQ(一个流行的开源JMS提供商)来实现消息队列服务...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务的demo

    基于Spring+JMS+ActiveMQ+Tomcat,我使用的版本情况如下所示:Spring 3.2.0,ActiveMQ 5.4.3,Tomcat 6.0.43。本例通过详细的说明和注释,实现消息服务的基本功能:发送与接收。Spring对JMS提供了很好的支持,可以...

    spring整合jms+activemq

    在IT行业中,Spring框架是Java领域最广泛应用的轻量级框架之一,而JMS(Java Message Service)则是一种标准接口,用于在分布式系统中进行异步消息传递。ActivemQ是Apache软件基金会的一个项目,它实现了JMS规范,...

    JMS之ActiveMQ与Spring整合源码

    将ActiveMQ与Spring整合,可以方便地在Spring应用中使用JMS,实现消息驱动的架构。 本文将深入探讨ActiveMQ与Spring整合的关键知识点: 1. **Spring对JMS的支持**: Spring提供了`org.springframework.jms`包,该...

    jms Spring+ActiveMQ 5.4.2

    标题 "jms Spring+ActiveMQ 5.4.2" 涉及的是Java消息服务(JMS)在Spring框架中的应用,以及ActiveMQ作为消息代理的使用。在这个主题下,我们将深入探讨JMS的基本概念、Spring对JMS的支持以及ActiveMQ 5.4.2版本的...

    spring使用activeMQ实现消息发送

    本文将深入探讨如何在Spring环境中使用ActiveMQ来实现消息的发送与接收,以提高系统的可扩展性和解耦性。 首先,我们需要了解Spring对ActiveMQ的支持。Spring提供了`spring-jms`模块,它包含了一组丰富的API和配置...

    Spring整合JMS——实现收发消息

    本篇文章将详细探讨如何通过Spring框架整合JMS,特别是使用ActiveMQ作为消息代理来实现消息的收发功能。 首先,我们需要理解Spring对JMS的支持。Spring提供了`org.springframework.jms`包,包含了一系列接口和类,...

    JMS之Spring +activeMQ实现消息队列

    总结起来,"JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步...

    消息队列:ActiveMQ:ActiveMQ与Java消息服务(JMS)API教程.docx

    消息队列:ActiveMQ:ActiveMQ与Java消息服务(JMS)API教程.docx

    JMS 使用 ActiveMQ 传送文件

    在IT领域,Java消息服务(Java Message Service,简称JMS)是一种标准API,它允许应用程序创建、发送、接收和读取消息。这种技术常用于异步通信,尤其是在分布式系统中,使得不同组件之间可以解耦并独立工作。...

    Java消息中间件JMS,ActiveMQ.zip

    Java消息中间件JMS,ActiveMQ

    spring整合JMS-居于ActiveMQ实现

    Spring整合JMS基于ActiveMQ实现是一项常见的企业级应用开发任务,它涉及到Spring框架、Java消息服务(JMS)以及ActiveMQ消息中间件的使用。在本文中,我们将深入探讨这三个关键概念,以及如何将它们有效地结合在一起...

    Spring和ActiveMQ的整合实例源码

    而ActiveMQ则是Apache软件基金会的一个开源项目,是Java消息服务(JMS)的实现,用于在分布式系统中可靠地传递消息。 当我们谈论Spring与ActiveMQ的整合时,主要涉及的是Spring的JMS模块。这个模块允许我们轻松地与...

    ActiveMQ学习笔记(二) JMS与Spring

    在本篇ActiveMQ学习笔记中,我们将探讨JMS(Java Message Service)与Spring框架的集成。JMS是一种标准API,用于在分布式环境中进行异步消息传递,而Spring框架则为开发人员提供了强大的依赖注入和管理服务的能力。...

    activemq消息中间件的使用demo,以及spring集合jms实现消息发送和处理。

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它遵循Java消息服务(JMS)标准,提供高效、可靠的异步消息传递。在分布式系统中,消息中间件扮演着至关重要的角色,它允许不同组件之间通过解耦的方式进行...

Global site tag (gtag.js) - Google Analytics