论坛首页 综合技术论坛

Java 消息服务(JMS)介绍­­­­——使用spring与activemq

浏览 7128 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2012-06-11  


                   Java 
消息服务(JMS)介绍

                                                                     ­­­­——使用springactivemq

 

第一次写技术文章,所以不懂规矩,写得也没什么条理,主要都是根据自己的一些实际操作经验写的,

以下代码都经过实际验证,其中也有自己不甚明了之处,欢迎大家拍砖或者交流。

 

关于jms,我打算会有一系列的文章,这里只是其中一节,

本节主要介绍如何在spring框架下配置使用jms,jms提供者使用activemq。
该配置相关说明:
1. 消息发送者与接受者分离,分别配置在两个独立的配置文件中
2. 启用一个消息服务器,其中产生一个queue一个topic,两个生产者分别往其中发送对象,发送时采用spring提供的转换器,可以实现java对象与jms消息的相互转化。
3. 消费者总共三个,一个消费queue中的消息,两个消费topic中的消息,我们称之为订阅者,其中一个订阅者是持久性订阅者,我们知道对于普通的订阅者来说,当该订阅者处于非活动期时topic中产生的消息是无法再传送给订阅者的(除非是实体化消息),但是持久化订阅者是可以收到其在非活动期间topic中产生的消息的(从其在服务器上注册时开始至现在)
4. 发送时使用的是同步发送方式,即发送者发送消息到服务器,等待服务器发送确认消息表示发送成功,发送者可以继续发送消息,消费者或者订阅者使用的是监听器方式,所以是采用异步接收方式,即接受者不需要一直阻塞直到接收消息,而是jms服务器有消息到达时会触发消息监听器的一个动作。当我们需要使用同步接收方式时就需要像同步发送消息一样使用jmstemplate。同时这里消息的确认方式采用默认的AUTO_ACKNOWLEDGE方式,即自动确认,即服务器收到消息立马发送确认消息,同样接收者收到消息会立即向服务器发送确认消息。需要注意的是,这里的同步异步,都只是描述客户端与服务器端之间的关系,而不是这发送者-服务器-接受者三者间的关系。


下面我们看具体的配置
activemq-produce.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd">

<!-- <context:property-placeholder location="classpath:server-config.properties" ignore-unresolvable="true"></context:property-placeholder>
-->
   <!--  指定发送端连接的activeMQ服务器-->
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
    <property name="brokerURL" value="tcp://localhost:61616"/>
    </bean>

    <!--  指定发送的目的地的类型和名字-->
    <bean id="MyQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="firstQueue"></constructor-arg>
    </bean>

    <bean id="Topic-A" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="Topic-A"></constructor-arg>
    </bean>

    <!-- converter  -->
    <bean id="defaultMessageConverter" class="com.gy.myactivemq.DefaultMessageConverter"/>
<bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
    <!--  Spring JmsTemplate config,即消息发送模板 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory">
            <!--  lets wrap in a pool to avoid creating a connection per send -->
            <!--  单例模式,避免每次发送消息时新产生一个连接 -->
            <bean class="org.springframework.jms.connection.SingleConnectionFactory">
                <property name="targetConnectionFactory" ref="connectionFactory"/>
            </bean>
        </property>
         <!-- custom MessageConverter -->
        <property name="messageConverter" ref="simpleMessageConverter"/>
    </bean>

   <!-- POJO which send Message uses  Spring JmsTemplate ,使用消息模板发送消息-->
    <bean id="queueMessageProducer" class="com.gy.myactivemq.QueueMessageProducer">
        <property name="template" ref="jmsTemplate"/>
        <property name="destination" ref="MyQueue"/>
    </bean>

    <bean id="topicMessageProducer" class="com.gy.myactivemq.TopicMessageProducer">
        <property name="template" ref="jmsTemplate"/>
        <property name="destination" ref="Topic-A"/>
    </bean>

</beans>
其中com.gy.myactivemq.QueueMessageProducer的java代码如下:
public class QueueMessageProducer {
    private JmsTemplate template;
    private Queue destination;
    public void setTemplate(JmsTemplate template) {
        this.template = template;
    }
    public void setDestination(Queue destination) {
        this.destination = destination;
    }
    public void send(FooMessage message) {
        template.convertAndSend(this.destination, message);
    }
    public void sendByMe(String mess) {
        template.convertAndSend(this.destination, mess);
    }
}
其中包括两个可以用来发送消息的方法sendByMe和send,可以分别用来发送不同类型的消息,如果想要发送其他类型的消息可以自己定于,然后在单元测试中调用方式如下:
@Test
public void sendTest(){
ApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-produce.xml");
QueueMessageProducer qmp=(QueueMessageProducer) ctx.getBean("queueMessageProducer");
TopicMessageProducer tmp=(TopicMessageProducer) ctx.getBean("topicMessageProducer");
FooMessage message=new FooMessage();
message.setId(123);
qmp.send(message);
tmp.send(message);
}

activemq-consumer.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
     http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">

<!-- <context:property-placeholder location="classpath:server-config.properties" ignore-unresolvable="true"></context:property-placeholder>
-->
    <!-- 用于连接activeMQ服务器 -->
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="clientIDPrefix" value="www"/>
</bean>

<!-- ActiveMQ destinations ,连接的目标名称-->
    <bean id="MyQueue" class="org.apache.activemq.command.ActiveMQQueue">
       <constructor-arg value="firstQueue"></constructor-arg>
    </bean>

    <bean id="Topic-A" class="org.apache.activemq.command.ActiveMQTopic">
       <constructor-arg value="Topic-A"></constructor-arg>
    </bean>


    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/>

<bean id="queueListener"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="queueConsumer" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="topicListenerB"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerB" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>

<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="MyQueue" />
<property name="messageListener" ref="queueListener" />
</bean>
<bean id="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerA" />
<property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_001"/>
        <property name="durableSubscriptionName" value="clientId_001"/>
</bean>

<bean id="topicListenerContainerB"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerB" />
</bean>
<!-- consumer for queue -->
   <bean id="queueConsumer" class="com.gy.myactivemq.QueueConsumer"/>

<!-- consumer for topic -->
   <bean id="topicConsumerA" class="com.gy.myactivemq.TopicConsumerA" />
   <bean id="topicConsumerB" class="com.gy.myactivemq.TopicConsumerB" />

</beans>

需要注意的是这段配置:
<bean id="topicListenerContainerA"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="concurrentConsumers" value="1" />
<property name="destination" ref="Topic-A" />
<property name="messageListener" ref="topicListenerA" />
<property name="subscriptionDurable" value="true"/>
        <property name="clientId" value="clientId_001"/>
        <property name="durableSubscriptionName" value="clientId_001"/>
</bean>
其中subscriptionDurable属性表明该处于该container中的订阅者是一个持久订阅者,配置持久订阅者必须指定一个clientId的值,而且这个值对于每一个订阅者都必须是唯一的,因为jms服务器要根据每个订阅者的这个clientId为其注册,这样jms服务器才能确保在该订阅者不活动时为其保存消息。还注意到如下配置:
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616" />
<property name="clientIDPrefix" value="www"/>
</bean>
其中<property name="clientIDPrefix" value="www"/>配置指明每个连接到jms服务器的连接的clientId的前缀是www,然后具体这个配置的用处现在还不清楚,之前网上说这个配置表明只有当连接的jms服务器的客户端的clientId名的前缀与这个配置一致时,该连接才能实现持久化订阅,但实际情况是这里不管要不要这个配置,订阅者只要指定任意的clientId值都可以实现持久订阅。

对于这段配置我们可以发现对于每一个listener监听者我们都需要配置一个container容器,这里我们可以通过jms标签简化这个配置,比如若有两个listener,topicListenerA和topicListenerB配置如下:
  <jms:listener-container connection-factory="connectionFactory" concurrency="1" destination-type="topic">
  <jms:listener destination="Topic-A" ref="topicListenerA" />
   <jms:listener destination="Topic-A" ref="topicListenerB" />
  </jms:listener-container>
其中topicListenerA和topicListenerB是在前面的配置文件中已经配好,当然我们也可以直接在<jms:listener destination="Topic-A" ref="topicListenerA" />标签中配置,这样下面这段代码也可以省了
<bean id="topicListenerA"
class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg ref="topicConsumerA" />
<property name="defaultListenerMethod" value="receive" />
<property name="messageConverter" ref="simpleMessageConverter" />
</bean>
具体配置可以参阅《java消息服务》一书。
使用jms标签需要加入如下命名空间:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
     http://www.springframework.org/schema/context/spring-context-3.0.xsd
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core-5.2.0.xsd
     http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms-3.0.xsd">
总之,这样就省去了一个listener需要一个container的麻烦,如果需要配置持久订阅者则需要如下配置:
   <!-- 弊端:一个listener-container若指定了client-id和destination-type来实现持久化订阅,则只能含有一个listener,因为每个listener的client-id必须唯一 -->
   <jms:listener-container connection-factory="connectionFactory" concurrency="1" client-id="clientId_001" destination-type="durableTopic">
   <jms:listener destination="Topic-A" ref="topicListenerA" />
   </jms:listener-container>
这么做的弊端已经在注释上标明,当然也许不是这样,但是目前个人没有发现如何将持久订阅者和非持久订阅者放在同一个container中。另外destination-type属性用于指定监听的消息提供者类型,默认是queue,所以监听topic时需要指定类型。

开启消息监听的代码很简单:
public static void main(String [] agr){
ApplicationContext ctx=new ClassPathXmlApplicationContext("activemq-consumer.xml");
}
注意这里不使用单元测试的@Test是因为,使用单元测试的方法运行之后马上会停止该方法,导致连接之后马上会断开,消息可能都还没有被监听者收到,所以我们需要保持监听者一直与jms服务器连接。

这里,需要用的jar包有三个必须的,分别是spring-jms,activemq-all,slf4j相关的包(slf4j-simple,slf4j-spi),当然spring读取配置文件的相关包肯定也是需要的,用过spring的应该都知道。

另外,activemq的启动是单独启动的,也可以通过如下方式在spring配置中启动:
<amq:broker useJmx="false" persistent="true">
<amq:persistenceAdapter>
<amq:amqPersistenceAdapter directory="d:/amq"/>
</amq:persistenceAdapter>
<amq:transportConnectors>
<amq:transportConnector uri="tcp://localhost:61616" />
</amq:transportConnectors>
</amq:broker>
但是不推荐这种方式,建议分开来启动。

   发表时间:2012-06-25  
格式有点乱,其他还好。

最好加上PooledConnectionFactory的配置,和事务配置。

如果能把JTA的补上就更棒了。
0 请登录后投票
   发表时间:2012-07-10  
QueueMessageProducer类中引入的Queue是activemq的还是jdk的?FooMessage呢?在哪个包?
0 请登录后投票
   发表时间:2012-07-12  
hxhnarkissos 写道
QueueMessageProducer类中引入的Queue是activemq的还是jdk的?FooMessage呢?在哪个包?

QueueMessageProducer类中引入的Queue来源:import javax.jms.Queue;
FooMessage是我自定义的一个对象而已,你可以随意叫什么名字,但是一定要记得序列化
0 请登录后投票
   发表时间:2012-07-15  
lz其实讲的是spring-jms,
感觉还是先介绍下spring-jms是什么,activemq有什么特点,他们的关系是什么
比较好一些。
0 请登录后投票
   发表时间:2012-07-15  
kimmking 写道
lz其实讲的是spring-jms,
感觉还是先介绍下spring-jms是什么,activemq有什么特点,他们的关系是什么
比较好一些。

感谢回复,其实我开始就是这样打算的,打算先介绍java的jms,然后接着介绍spring-jms,最后讲讲与activemq的关系,打算写成一个系列的,不过最近忙别的去了,所以自从写了这个以后一直没写了,确实不大好,我会继续加油补上的,感谢支持
0 请登录后投票
   发表时间:2012-07-15  
不客气,在挖坑不填方面,我有着非常丰富的经验。。。

我这儿有篇:JMS介绍:我对JMS的理解和认识 
http://setting.iteye.com/blog/1097767
0 请登录后投票
   发表时间:2012-07-15  
非常感谢gong1208的解答和kimmking的推荐
0 请登录后投票
   发表时间:2012-08-10  
有没有完整的代码?
0 请登录后投票
论坛首页 综合技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics