`
newleague
  • 浏览: 1504462 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类

JMS与Spring之二(用message listener container异步收发消息)

    博客分类:
  • JMS
 
阅读更多

本文主要参考Java Message Service(2nd)by Marc Ricbard。

在Spring框架中使用JMS传递消息有两种方式:JMS template和message listener container,前者用于同步收发消息,后者用于异步收发消息。

本文主要讲述用message listener container异步收发消息。

Spring中有三种方式配置异步消息监听器:实现javax.jms.MessageListener接口、实现Spring的SessionAwareMessageListener和捆绑一个标准POJO到Spring的MessageListenerAdapter类上。这三种方法在消息监听器类结构上不同,但都要用message listener container,message listener container类似JmsTemplate。

1.       message listener container综述

message listener container绑定连接类工厂(connection factory)、JMS Destination、JNDI Destination解析器和message listener bean。Spring提供了两种message listener container:DefaultMessageListenerContainer和SimpleMessageListenerContainer。两种message listener container都允许指定数量的并发监听线程,只有DefaultMessageListenerContainer可以在允许期间动态调整监听线程的数量。另外,DefaultMessageListenerContainer允许和XA Transactions的集成。对于使用本地事务管理器和不需要基于可变负载的线程、会话、连接调整的简单消息传递应用,使用SimpleMessageListenerContainer。对于使用外部事务管理器或XA事务的消息传递应用,使用DefaultMessageListenerContainer。

message listener container的配置类似JmsTemplate。使用JNDI访问连接工厂和JMS Destinations,或直接使用JMS提供者的native 连接工厂和JMS destination类。下面给出一个例子,使用了ActiveMQ的DefaultMessageListenerContainer。

         <bean id="jndiTemplate" class="org.springframework.jndi.JndiTemplate">

       <property name="environment">

          <props>

             <prop key="java.naming.factory.initial">

                   org.apache.activemq.jndi.ActiveMQInitialContextFactory</prop>

             <prop key="java.naming.provider.url">tcp://localhost:61616</prop>

             <prop key="java.naming.security.principal">system</prop>

             <prop key="java.naming.security.credentials">manager</prop>

          </props>

       </property>

</bean>

    <bean id="jndiQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">

       <property name="jndiTemplate" ref="jndiTemplate"/>

       <property name="jndiName" value="QueueCF"/>

</bean>

<bean id="queueConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">

        <property name="targetConnectionFactory" ref="jndiQueueConnectionFactory"/>

        <property name="sessionCacheSize" value="1"/>

</bean>  

<bean id="queueDestination"

   class="org.springframework.jndi.JndiObjectFactoryBean">

   <property name="jndiTemplate" ref="jndiTemplate"/>

   <property name="jndiName" value="queue1"/>

</bean> 

<bean id="destinationResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver">

            <property name="jndiTemplate" ref="jndiTemplate"/>

            <property name="cache" value="true"/>

            <property name="fallbackToDynamicDestination" value="false"/>

</bean>

<bean id="messageListener1" class="com.jms.server.JMSReceiverAsync1" />

         <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

       <property name="connectionFactory" ref="queueConnectionFactory"/>

            <property name="destinationResolver" ref="destinationResolver"/>

       <property name="destinationName" value="queue1"/>

       <property name="messageListener" ref="messageListener1" />

</bean>

在这个配置中,messageListener1通过jmsContainer bean注入到DefaultMessageListenerContainer中,同时注入的还有destination名字、连接工厂、JNDI destination解析器。本例中,concurrentConsumers属性指示启动了三个并发监听线程。这个配置会应用到下面使用三种方法实现消息监听器的例子中。

2.       使用MessageListener接口

这是最简单的一种方式。如下图所示,POJO实现了MessageListener接口,并注入到DefaultMessageListenerContainer中。然后,DefaultMessageListenerContainer注入到CachingConnectFactory和JNDIDestinationResolver中。

实现MessageListener接口,需要重载onMessage方法。如下所示:

public void onMessage(Message message) {

                   try {

             if (message instanceof TextMessage) {

                        System.out.println(((TextMessage)message).getText());

             } else if (message instanceof MapMessage) {

                        MapMessage mmsg = (MapMessage)message;

                        System.out.println(mmsg.getLong("acctId") + ", " +

                                          mmsg.getString("side") + ", " +

                                          mmsg.getString("symbol") + ", " +

                                          mmsg.getDouble("shares"));

             } else {

                        throw new IllegalStateException("Message Type Not Supported");

             }

                   } catch (JMSException e) {

                            e.printStackTrace();

                   }

}

3.       使用SessionAwareMessageListener接口

SessionAwareMessageListener是Spring提供的javax.jms.MessageListener的扩展。类似地,它也包含了一个必须重载的onMessage方法。不同之处在于,SessionAwareMessageListener提供了对JMS Session的访问功能。配置方法和MessageListener相同。实现方法如下所示:

public void onMessage(Message message, Session session) throws JMSException {

    if (message instanceof TextMessage) {

               System.out.println(((TextMessage)message).getText());

    } else if (message instanceof MapMessage) {

               MapMessage mmsg = (MapMessage)message;

               System.out.println(

                                 mmsg.getLong("acctId") + ", " +

                                 mmsg.getString("side") + ", " +

                                 mmsg.getString("symbol") + ", " +

                                 mmsg.getDouble("shares"));

    } else {

               throw new JMSException("Message type not supported");

    }

    long conf = 12345678;

    MessageProducer sender = session.createProducer(message.getJMSReplyTo());

    StreamMessage smsg = session.createStreamMessage();

    smsg.setJMSCorrelationID(message.getJMSMessageID());

    smsg.writeLong(conf);

    sender.send(smsg);

}

4.       使用MessageListenerAdapter

这个方法的特别之处在于POJO类不实现任何消息监听器接口,也不需要引用javax.jms.Message对象。如下图所示,POJO接收类注入Spring的MessageListenerAdapter,MessageListenerAdapter再注入DefaultMessageListenerContainer。

使用MessageListenerAdapter时,有多种方式安排POJO接收器方法。可以使用缺省的消息处理方法,也可以给监听类指定一个定制的方法。使用后者时,还需要指定是否使用一个消息转换器将收到的消息转换为相应的Java对象类型,或直接消费一个JMS Message。

(1)       缺省的消息处理方法

缺省情况下,MessageListenerAdapter会在POJO中根据收到的JMS消息类型寻找一个handleMessage方法。下面列出了使用自动消息转换时handleMessage方法的特征。

//receive a converted TextMessage body

public void handleMessage(String message) {...}

//receive a converted BytesMessage body

public void handleMessage(byte[ ] message) {...}

//receive a converted MapMessage body

public void handleMessage(Map message) {...}

//receive a converted ObjectMessage body

public void handleMessage(Object message) {...}

要使用缺省的消息监听处理方法,只需要通过constructor-arg属性(或delegate属性)将MDP注入MessageListenerAdapter bean。

         <bean id="messageListener3" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

       <constructor-arg>

          <bean class="com.jms.server.JMSReceiverPOJO"/>

       </constructor-arg>

 </bean>

         <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

       <property name="connectionFactory" ref="queueConnectionFactory"/>

            <property name="destinationResolver" ref="destinationResolver"/>

       <property name="destinationName" value="request"/>

       <property name="messageListener" ref="messageListener3" />

 </bean>

定义MDP(Message-driven POJO)时,只需要为需要处理的JMS message类型定义相应的handleMessage方法。如下面代码所示。

public class JMSReceiverPOJO {

public void handleMessage(Map message) {

              System.out.println("Received Message A: " + message);

     }

     public void handleMessage(TradeData message) {

              System.out.println("Received a tradedata object: ");

     }      

     public void handleMessage(Trade2Data message) {

              System.out.println("Received a trade2data object: ");

     }

}

注意,JMSReceiverPOJO不需要引用任何JMS API,事实上,本例中的POJO甚至不知道它要用在消息传递上下文中。所有的消息传递架构都由MessageListenerAdapter和DefaultMessageListenerContainer处理。需要做的就是写相应的消息类型对应的handleMessage方法。如果收到的消息没有对应的handleMessExceptionage方法,会抛出NoSuchMethod。

上面的方法中,一个问题是消息转换只处理消息负载,头属性(header properties)和应用属性(application properties)没有访问。如果消息带有这些属性信息,需要告诉DefaultMessageListenerContainer不希望自动转换消息负载。要做到这点,需要将MessageListenerAdapter bean的messageConverter属性置为null。

    <bean id="messageListener3" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

       <constructor-arg>

          <bean class="com.jms.server.JMSReceiverPOJO"/>

       </constructor-arg>

       <property name="messageConverter"><null/></property>

</bean>

这时,MessageListenerAdapter会缺省寻找下面的handleMessage方法。注意,上面的方法处理的是消息的body,此处的方法处理的消息全部。在方法内部可以访问头属性(header properties)和应用属性(application properties)。

//receive a JMS TextMessage

public void handleMessage(TextMessage message) {...}

//receive a JMS MapMessage

public void handleMessage(MapMessage message) {...}

(2)       定制消息处理方法

要使用定制的消息处理方法,需要将MessageListenerAdapter的defaultListenerMethod属性设置为自定义的方法。如下例所示。

         <bean id="messageListener3" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

       <constructor-arg>

          <bean class="com.jms.server.JMSReceiverPOJO"/>

       </constructor-arg>

       <property name="defaultListenerMethod" value="processRequest"/>

       <property name="messageConverter"><null/></property>

</bean>

     public void processRequest(Map message) {

              System.out.println(message.get("acctId") + ", " +

                                 message.get("side") + ", " +

                                 message.get("symbol") + ", " +

                                 message.get("shares"));

}

5.       PS:可以使用QueueBrowser查看队列中没有发送出去的消息。

分享到:
评论

相关推荐

    JMS与Spring之一(用JmsTemplate同步收发消息)

    JMS与Spring之一(用JmsTemplate同步收发消息) JMS(Java Message Service)是一种Java API,用于在两个应用程序之间异步地发送消息。Spring框架提供了对JMS的支持,允许开发者使用JMS template或message listener...

    ActiveMQ学习笔记(二) JMS与Spring

    综上所述,Spring与JMS的集成为开发人员提供了一种便捷的方式来处理分布式环境中的异步通信,通过合理的配置和编程,可以构建出健壮且高性能的应用系统。在实际项目中,理解并熟练运用这些知识点对于提升系统的稳定...

    JMS之Spring +activeMQ实现消息队列

    "JMS之Spring + ActiveMQ实现消息队列"涉及到的关键知识点包括:Spring框架的JMS支持、ActiveMQ的使用、ConnectionFactory的配置、JmsTemplate和MessageListener的实现,以及消息队列在解决系统解耦和异步处理中的...

    tomcat spring jms 异步消息传递入门实例

    &lt;bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"&gt; &lt;property name="messageListener" ref="messageListener" /&gt; ``` 6. **使用**:在`...

    Spring整合JMS——实现收发消息

    在IT行业中,Spring框架是Java领域最常用的轻量级应用框架之一,而JMS(Java Message Service)则是Java平台上的消息中间件标准,用于应用程序之间的异步通信。本篇文章将详细探讨如何通过Spring框架整合JMS,特别是...

    使用Spring JMS轻松实现异步消息传递.pdf

    4. **灵活性**:Spring JMS 提供了多种类和接口,如 `MessageListener` 和 `DefaultMessageListenerContainer`,支持不同的消息消费模式,如同步和异步,满足不同场景需求。 5. **集成友好**:Spring 框架的整体...

    JMS整合Spring实例

    &lt;bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"&gt; &lt;property name="messageListener" ref="messageListener" /&gt; &lt;bean id="messageListener...

    spring_jms

    Spring提供了`MessageListenerContainer`接口,允许我们定义监听JMS消息的策略,例如基于`DefaultMessageListenerContainer`的实现,它可以自动启动和停止消息监听。 接下来,JMS是Java平台的标准API,用于在分布式...

    Spring+weblogic接收JMS消息

    - 当消息到达时,`MessageListenerContainer`会自动调用`onMessage`方法,从而实现异步处理。 5. **测试与调试**: - 可以使用Spring的`JmsTemplate`进行发送消息的测试,或者通过WebLogic Server的管理控制台...

    jms整合spring工程

    2. **Spring与JMS整合** - **Spring JMS模块**:Spring框架提供了一组JMS模板和监听器容器,简化了消息的生产和消费。 - **ConnectionFactory**:JMS API的核心对象,用于创建连接到消息代理的连接。 - **...

    Spring发送接收JMS消息

    **Spring与JMS消息传递** 在Java世界中,Java Message Service (JMS) 是一个标准接口,用于在分布式环境中发送和接收消息。Spring框架提供了一种简单而强大的方式来集成JMS,使得开发者可以轻松地在应用中实现异步...

    spring-jms源码

    MessageListenerContainer是Spring JMS用于异步处理消息的组件,它可以监听一个或多个队列或主题,并在接收到消息时调用预先注册的MessageListener。这使得应用程序可以专注于业务逻辑,而无需关心消息的接收和处理...

    spring-jms:Spring JMS可同时使用异步消息和异步消息

    该项目提供了一些使用Spring JMS进行异步和同步消息使用的示例。 异步消费 所有异步消息使用的示例都使用Spring 进行消息使用。 Spring JMS支持三种类型的消息侦听器,包括: [MessageListenerAdapter]( 异步...

    spring-jms入门

    - **MessageListenerContainer**:通过实现`MessageListener`接口,Spring容器会自动启动监听线程,接收到消息后调用监听器方法。 - **JmsTemplate.receive()**:阻塞式接收,调用后会等待直到有消息到达。 **7. ...

    JMS之ActiveMQ与Spring整合源码

    Spring提供了`org.springframework.jms`包,该包下包含了许多用于JMS的抽象类和接口,如`ConnectionFactory`, `MessageListenerContainer`, `JmsTemplate`等,它们极大地简化了JMS的使用。例如,`JmsTemplate`提供...

    spring-jms-oracle-aq.rar_oracle aq_spring oracle aq_spring oracl

    通过配置Spring的JmsTemplate和MessageListenerContainer,我们可以实现消息的发送和接收。 1. 配置Spring JMS与Oracle AQ: 要在Spring中配置Oracle AQ,你需要创建一个ConnectionFactory bean,它是连接到Oracle...

    JMS-Spring

    &lt;bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"&gt; &lt;property name="messageListener" ref="myMessageListener" /&gt; ``` 这样,当消息发送到`...

    spring jms tomcat 异步消息传递入门实例

    综上所述,"spring jms tomcat 异步消息传递入门实例"旨在引导开发者理解如何在Spring应用中结合Tomcat使用JMS实现异步消息传递,从而提升系统的响应能力和处理大规模并发的能力。通过这个实例,你可以学习到Spring...

Global site tag (gtag.js) - Google Analytics