`
AllenHU0320
  • 浏览: 85852 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Spring框架对JMS的集成

 
阅读更多

1.JMS规范有两个主要版本,即JMS1.02和JMS1.1。在JMS1.02规范中定义了如下两种明确的消息域模型(Messageing Domain):

点对点模式(Point-to-Point),简称PTP模式。消息发送者将消息发送到指定的消息队列(Queue),消息接受者也是从同一消息队列中顺序获取数据,并对获取的消息进行处理。

发布订阅模式(Publish/Subscribe),即Pub/Sub模式。消息发送者将消息发送到指定的消息主题(Topic),多个消息接受者可以从响应主题中获取某个消息的拷贝并进行处理。

JMS1.1之后,倾向于采用较为通用的消息访问API接口,可以使用一套通用的API访问现有的两种消息域模型。

 

2.一个典型的JMS应用通常由以下几个部分组成

1)JMS类型的客户端程序(JMS Client):使用Java语言编写的用于发送或者接受消息的程序。

2)非JMS类型的客户端程序(non-JMS client):使用响应消息系统或者产品提供的本地API,而不是JMS API来进行消息访问的程序。

3)消息(Message):作为消息应用,传递的当然是各种消息。JMS规范了5种消息类型,即StreamMessage、BytesMessage、MapMessage、TextMessage以及ObjectMessage。

4)JMS Provider:JMS规范指定了一系列的API接口,而加入JMS规范的各种MOM(Message-Oriented Middleware)产品,需要根据规范提供特定于它们自身产品的JMS API接口实现,以及各种相关服务(比如消息的存储转发等)。这些特定于MOM产品的JMS API接口实现以及相关功能实现就被称之为JMS Provider.

5)受管理对象(Administered Object):为了向不同JMS类型客户端屏蔽各种JMS Provider之间的差异性,系统管理员可以将某些特定类型的对象,通过JMS Provider提供的管理工具提前配置到系统中。这些提前配置到系统然后再交给JMS类型客户端使用的对象就叫做受管理对象。受管理对象有如下两种类型:

ConnectionFactory:客户端需要通过ConnectionFactory获取相应的Connection,并创建其他相关对象才能访问消息系统中的消息

Destination:即消息发送或者接受的目的地。依消息域模型的不同,Destination可以划分为更加具体的消息队列(Queue)和消息主题(Topic)两种类型

通常,受管理对象被绑定到相应的JNDI服务,客户端只需要通过JNDI查找指定的接口类型并使用即可。

 

3.使用JMS API进行应用开发的传统套路

使用JMS原始API进行消息发送的代码示例

ConnectionFactory connectionFactory = lookupCFViaJNDI();

Destination dest = lookupDestViaJNDI();

Connection con = null;

Session session = null;

try{

      con = connectionFactory.createConnection();

      session = con.createSession(false,Session.AUTO_ACKNOWLEDGE);

      MessageProducer messageProducer = session.createProducer(dest);

 

      TextMessage message = session.createTextMessage();

      message.setText("Hi,hello");

      messageProducer.send(message);

 

      messageProducer.close();

      session.close();

}catch(JMSException e){

      e.printStackTrace();  //不要这么做

}finally{

      if(con != null){

             try{con.close();}catch(JMSException e){e.printStackTrace();//不要这么做}

      }

}

a)首先,从JNDI获取JMS的受管对象,即ConnectionFactory和稍后要用到的一个或者多个Destination应用

b)使用获取的ConnectionFactory创建发送消息用的Connection

c)使用Connection创建发送消息用的Session

d)使用Session创建发送消息用的MessageProducer

e)使用Session创建将被发送的消息

 f)调用MessageProducer发送创建完的消息到指定的Destination

g)最后做资源管理,包括关闭MessageProducer、Session以及Connection

 

4.Spring改进后的JMS实战

1)消息发送和同步接受

org.springframework.jms.core.JmsTemplate是Spring框架为JMS消息发送以及同步消息接受场景提供的核心支持

JmsTemplate唯一必须依赖的就是一个JMS的ConnectionFactory

ConnectioFactory cf = ...;

JmsTemplate jmsTemplate = new JmsTemplate(cf);

...

JmsTemplate核心模板方法原型代码示例

public Object execute(SessionCallback action,boolean startConnection)throws JmsException{

      try{

             //获取相应的connection

             //创建相应的session

      }catch(JMSException e){

             //转译并抛出相应异常

      }finally{

             //清理相应资源

      }

}

该方法通过SessionCallback回调接口为用户提供自定义逻辑介入点,而资源管理等一系列其他问题则由当前模板方法统一管理。startConnection参数告知当前模板方法,是否要调用Connection的start()方法开始传送消息。

public Object execute(SessionCallback action) throws JmsException{

       return execute(action,false);

}

该方法只能用在发送消息的时候

 

JmsTemplate的模板方法大略上划分为三类:一类专门用于消息的发送,一类用于消息的同步接收,最后一类是使用QueueBrower检查消息队列的情况。

(1)JmsTemplate的消息发送模板方法。用于发送消息的模板方法可以进一步划分为如下三类

a)使用ProducerCallback的模板方法

public interface ProducerCallback{

       Object doInJms(Session session,MessageProduer producer) throws JMSException;

}

jmsTemplate.execute(new ProducerCallback(){

       public Object doInJms(Session session,MessageProducer producer) throws JMSException{

              Message message= session.createObjectMessage();

              producer.send(destination,message);

              return null;

       }

});

b)使用MessageCreator的模板方法

jmsTemplate.send(new MessageCreator(){

        public Message createMessage(Session session) throws JMSException{

               ObjectMessage message = session.createObjectMessage();

               //或者 message = session.createXXXMessage();

               ...

               return message;

        }

});

send(...)方法除了可以接收MessageCreator作为参数,还可以接收String类型或者Destination类型参数,用于标志消息发送的Destination。如果没有为send(...)方法指定消息发送的Destination相关信息,消息默认将被发送到为JmsTemplate指定的默认Destination

c)使用MessageConverter的模板方法。MessageConverter是JmsTemplate使用的一个Helper类,负责具体消息类型与对象类型之间的相互转换

public interface MessageConverter{

       Message toMessage(Object object,Session session) throws JMSException,MessageConversionException;

       Object fromMessage(Message message) throws JMSException,MessageConversionException;

}

toMessage()方法负责将相应的对象转换为JMS的Message以便发送,而fromMessage()则负责将JMS的Message转型为应用程序所使用的对象类型。convertAndSend(...)这类模板方法可以接收任何类型的对象(String类型、Map类型以及Object类型等)

JmsTemplate默认使用的MessageConverter实现为org.springframework.jms.Support.converter.SimpleMessageConverter,可以满足String类型、byte数组、Map以及可序列化对象(Serializable)与JMS对应Message之间的相互转换

(2)JmsTemplate的同步消息接收模板方法

a)直接同步接收的模板方法。以receive(...)命名的模板方法是最直接也是最简单的同步消息接收方法

b)可以指定MessageSelector表达式的模板方法。JMS提供的Message Selector机能,可以让消息接收方指定接收符合某种条件的消息

c)使用MessageConverter的模板方法

d)结合selector表达式和MessageConverter的模板方法

(3)检查消息队列情况的模板方法

JMS规范提供的QueueBrower允许我们查看某一消息队列的情况

 

2)同步消息处理场景浅析

使用特定单一队列实现消息发送和同步接收功能的代码示例

final AuthRequest authReq = new AuthRequest();

authReq.setUserId("...");

authReq.setPassword("...");

...

String messageSelector = SystemMessageProperties.REQUEST_ID + " ='" + requestId + "' ";

getJmsTemplate().convertAndSend(getRequestDest(),authReq,new MessagePostProcessor(){

        public Message postProcessMessage(Message message) throws JMSException{

               message.setStringProperty(SystemMessageProperties.REQUEST_ID,requestId);

               return message;

        }

});

Message responseMessage = getJmsTemplate().receiveSelected(getResponseDest(),messageSelector);

processMessage(responseMessage);

使用Temporary Queue实现同步消息接收的代码示例

final AuthRequest authReq = new AuthRequest();

authReq.setUserId("...");

authReq.setPassword("...");

...

getJmsTemplate().execute(new SessionCallback(){

        public Object doInJms(Session session) throws JMSException{

              ObjectMessage message = session.createObjectMessage();

              message.setObject(authReq);

              TemporaryQueue responseDestination = session.createTemporaryQueue();

              message.setJMSReplyTo(responseDestination);

              MessageProducer producer = session.createProducer(getRequestDest());

              try{

                     producer.send(message);

              }finally{

                     JmsUtils.closeMessageProducer(producer);

              }

              MessageConsumer consumer = session.createConsumer(responseDestination);

              try{

                     Message receiveMessage = consumer.receive(10000L);

                     processMessage(receiveMessage);

              }finally{

                     JmsUtils.closeMessageConsumer(consumer);

              }

              return null;

        }

},true);

 

5.异步消息接收

1)Spring提供了MessageListenerContainer,借助于MessageListenerContainer的支持,我们可以创建消息驱动POJO来处理异步消息

MessageListenerContainer有两个职责:

负责到指定的Destination接收符合处理条件的消息

将接收到的消息通过某种策略转发给指定类型的MessageListener实现类来处理

Spring框架默认提供了三种MessageListenerContainer实现类

(1)org.springframework.jms.listener.SimpleMessageListenerContainer

SimpleMessageListenerContainer原型代码示例

int concurrentCount = 5;

Destination destination = //接收消息使用到的Destination

MessageListener messageListener = //相应的MessageListener实现

//最好是注入相应的ConnectionFactory实例

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

Connection connection = connectionFactory.createConnection();

Session[] concurrentSessions = new Session[concurrentCount];

MessageConsumer[] concurrentConsumers = new MessageConsumer[concurrentCount];

for(int i = 0; i < concurrentCount; i++){

       concurrentSessions[i] = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

       concurrentConsumers[i] = concurrentSessions[i].createConsumer(destination);

       concurrentConsumers[i].setMessageListener(messageListener);

}

connection.start();

SimpleMessageListenerContainer允许我们指定一个TaskExecutor用于消息处理的调度

(2)org.springframework.jms.listener.DefaultMessageListenerContainer

借助于相应的TaskExecutor,DefaultMessageListenerContainer可以同时启动多个线程循环调用JMS的同步消息接收方法,而接收的消息的处理,则完全是在自己的线程内进行

DefaultMessageListenerContainer实现原型代码示例

int concurrentCount = 5;

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory();

final JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);

Runnable processor = new Runnable(){

       public void run(){

             while(containerAlive){

                    Message message = jmsTemplate.receive(getDestination());

                    getMessageListener().onMessage(message);

             }

       }

};

Thread[] threads = new Thread[concurrentCount];

for(Thread thread : threads){

       thread = new Thread(processor);

       thread.start();

}

DefaultMessageListenerContainer内部的消息接收和分发不是向我们那样,每次都创建新的线程,而是将调度的工作转给了相应的TaskExecutor。DefaultMessageListenerContainer内部也不是真的使用了JmsTemplate,它所做的工作要比原型做的更多,更加严谨。DefaultMessageListenerContainer将是我们使用最多的MessageListenerContainer实现。

(3)org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer

要使用JMS Provider构建于ServerSessionPool上的各种消息处理机制的话,可以直接使用ServerSessionMessageListenerContainer。所有的MessageListenerContainer都实现了org.springframework.context.Lifecycle接口。只要将具体的MessageListenerContainer实现类添加到Spring的IoC容器中就能运行

<bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">

        <property name="connectionFactory" ref="connectionFactory" />

        <property name="destination" ref="authRequestQueue" />

        <property name="messageListener" ref="authReqListener" />

</bean>

Spring2.5后,为基于XSD的配置空间引入了jms专有的命名空间

<jms:listener-container>

        <jms:listener destination="authRequestQueue" ref="authReqListener" />

</jms:listener-container>

 

2)消息驱动POJO

消息驱动POJO必须实现MessageListener接口

public class YourMDBImpl implements MessageListener{

      public void onMessage(Message msg){

             try{

                  //处理msg

             }catch(JMSException e){

                  throw JmsUtils.convertJmsAccessException(e);

             }

      }

}

<jms:listener-container>

      <jms:listener destination="..." ref="YourMDB">

</jms:listener-container>

<bean id="yourMDB" class="...">

      <!-- 可能的依赖注入对象 -->

</bean>

用于处理验证消息的消息驱动POJO实现代码示例

public class AuthRequestListener implements MessageListener{

      private JmsTemplate jmsTemplate;

      private MessageConverter messageConverter = new SimpleMessageConverter();

      private IUserAuthService userAuthService;

      public void onMessage(Message msg){

             try{

                  AuthRequest authRequest = (AuthRequest)getMessageConverter().fromMessage(msg);

                  AuthResponse authResponse = getUserAuthService().processAuthRequest(authRequest);

                  getJmsTemplate().convertAndSend(msg.getJMSReplyTo(),authResponse);

             }catch(){

                  throw JmsUtils.convertJmsAccessException(e);

             }

      }

      //getter和setter方法定义...

}

AuthRequestListener相关配置示例

<jms:listener-container>

      <jms:listener destination="authRequestQueue" ref="authReqListener" selector="JMSReplyTo" />

</jms:listener-container>

<bean id="authReqListener" class="...AuthRequestListener">

      <property name="userAuthService" ref="userAuthService" />

      <property name="jmsTemplate" ref="producerTemplate" />

</bean>

<bean id="userAuthService" class="...UserAuthServiceImpl">

</bean>

Spring提供的所有MessagelisterContainer都支持MessageListener和SessionAwareMessageListener两种类型的消息驱动POJO。还有MessageListenerAdapter的实现。

 

JMS相关异常处理

框架内的事务管理支持

分享到:
评论

相关推荐

    Spring+Weblogic JMS

    在本项目中,Spring与WebLogic JMS(Java消息服务)的集成展示了如何在Spring环境中使用消息队列进行通信。 WebLogic JMS是Oracle WebLogic Server提供的消息中间件,它遵循JMS规范,用于在分布式环境中传递消息,...

    Spring集成JMS

    Spring集成JMS是Java消息服务(Java Message Service)与Spring框架的结合,它提供了一种在分布式系统中高效处理异步消息传递的方式。Spring通过其强大的IoC(Inversion of Control,控制反转)和AOP(Aspect ...

    jms简单demo,集成spring和不集成

    在本示例中,"jms简单demo"涵盖了两个方面:与Spring框架的集成以及不集成Spring的情况。 首先,让我们深入了解JMS。JMS定义了消息生产者(Producer)、消息消费者(Consumer)和消息代理(Message Broker)之间的...

    Spring框架参考手册

    ### Spring框架参考手册知识点概述 #### 一、Spring框架概览 - **1.1 开始使用Spring** - Spring框架作为一个广泛使用的轻量级Java应用框架,为开发者提供了全面的解决方案,帮助他们构建高性能、可扩展的企业级...

    JMS_Spring集成所需jar

    Spring也提供了对JMS的全面支持,简化了在Spring应用中集成和使用JMS的过程。 在"JMS_Spring集成所需jar"的压缩包中,通常会包含以下关键的JMS和Spring相关的库文件: 1. **ActiveMQ** - 如果你看到有activemq相关...

    SpringJMS示例代码

    SpringJMS是Spring框架对JMS API的包装,它简化了生产者和消费者之间的消息通信。通过使用SpringJMS,开发者可以避免直接处理JMS API的复杂性,而是利用Spring的依赖注入和模板方法设计模式来创建消息驱动的应用...

    spring_jms

    Spring JMS(Java Message Service)是Spring框架的一部分,专门用于集成JMS消息传递系统,以实现异步通信和解耦应用程序组件。在这个入门级实例中,我们将探讨如何使用Maven、Spring和ActiveMQ来构建一个简单的...

    Spring发送接收JMS消息

    Spring框架提供了一种简单而强大的方式来集成JMS,使得开发者可以轻松地在应用中实现异步通信和解耦。本篇文章将深入探讨如何使用Spring进行JMS消息的发送和接收。 ### 1. JMS概述 JMS是一种中间件协议,它定义了...

    spring框架教程 PPT

    Spring框架是Java开发中最常用的轻量级开源框架之一,它以其强大的依赖注入(Dependency Injection,简称DI)和面向切面编程(Aspect-Oriented Programming,简称AOP)能力而闻名。本教程旨在深入讲解Spring框架的...

    spring,weblogic配置jms

    Spring框架是一个广泛使用的Java应用开发框架,它提供了与多种消息中间件集成的能力,包括WebLogic Server的JMS服务。WebLogic是Oracle公司的一款企业级应用服务器,它支持JMS规范,提供了强大的消息队列和发布/订阅...

    SSH和Spring框架简介

    SSH和Spring框架是Java开发中的两个重要工具,它们在企业级应用开发中占据了核心地位。SSH,是由Struts2、Spring和Hibernate三个框架组成的集成解决方案,而Spring则是一个全面的后端开发框架,包含了多种功能模块。...

    spring-jms入门

    Spring-JMS是Spring框架的一部分,专门用于处理Java消息服务(JMS)的集成。它提供了一个简单的API,使得开发者能够方便地在应用中使用消息传递功能。本文将深入探讨Spring-JMS的基础知识,包括它的核心概念、配置...

    Spring框架jar包全

    4. **Spring Context**:这是Spring框架的上下文模块,它扩展了Core Container的概念,引入了环境感知能力,如bean的国际化、事件传播以及对其他Spring模块(如JDBC、JMS、JMX等)的支持。 5. **Spring JDBC**:...

    JMS实例,整合spring,含jar

    Spring框架是一个广泛使用的Java开发框架,它提供了对JMS的全面支持,使得集成和管理消息变得更加简单。 本实例中,"JMS实例,整合spring,含jar"表明这是一个具体的项目示例,展示了如何在Spring框架中配置和使用JMS...

    spring框架入门到熟练

    ### Spring框架入门到熟练 #### 一、Spring框架概述 Spring框架是一款开源的企业级应用框架,旨在解决企业级应用开发中的复杂性问题。它最初由Rod Johnson创建,并于2004年发布了第一个版本。Spring框架的核心优势...

    spring-jms

    Spring-JMS是Spring框架的一部分,专门用于处理Java消息服务(JMS)的集成。这个jar包,即`spring-jms-3.1.1.RELEASE.jar`,包含了Spring对JMS API的抽象和扩展,使得在Spring应用中使用JMS变得更加简单和灵活。 **...

    Spring-JMS把企业消息处理变容易.doc

    Spring JMS 是一个强大的框架,它极大地简化了Java企业级消息处理。它通过提供一套抽象和模板类,使得开发者能够更加便捷地使用Java消息服务(JMS),并与各种JMS提供者,如IBM的WebSphere MQ进行集成。本文将深入...

    spring整合jms+activemq

    在IT行业中,Spring框架是Java领域最广泛应用的轻量级框架之一,而JMS(Java Message Service)则是一种标准接口,用于在分布式系统中进行异步消息传递。ActivemQ是Apache软件基金会的一个项目,它实现了JMS规范,...

Global site tag (gtag.js) - Google Analytics