`

Spring JMS 几个类介绍

阅读更多

1 DestinationResolver 
    DestinationResolver接口的作用是将指定的目的地名解析为目的地实例。其定义如下:

Java代码  收藏代码
  1. public interface DestinationResolver {  
  2.     Destination resolveDestinationName(Session session, String destinationName,   
  3.         boolean pubSubDomain) throws JMSException;  
  4. }  

    参数pubSubDomain用于指定是使用“发布/订阅”模式(解析后的目的地是Topic),还是使用“点对点”模式(解析后的目的地是Queue)。

 

    CachingDestinationResolver接口继承了DestinationResolver,增加了缓存的功能,其接口定义如下:

Java代码  收藏代码
  1. public interface CachingDestinationResolver extends DestinationResolver {  
  2.     void removeFromCache(String destinationName);  
  3.     void clearCache();  
  4. }  

    在目的地失效的时候,removeFromCache方法会被调用;在JMS provider失效的时候,clearCache方法会被调用。

1.1 DynamicDestinationResolver 
    DynamicDestinationResolver实现了DestinationResolver接口。根据指定的目的地名,DynamicDestinationResolver会动态创建目的地实例。针对JMS1.1规范,它采用如下方法创建目的地:

Java代码  收藏代码
  1. session.createTopic(topicName)  
  2. session.createQueue(queueName);  


1.2 JndiDestinationResolver 
    JndiDestinationResolver继承自JndiLocatorSupport, 同时实现了CachingDestinationResolver接口。如果在JMS provider中配置了静态目的地,那么JndiDestinationResolver通过JNDI查找的方式获得目的地实例。


    JndiDestinationResolver的fallbackToDynamicDestination属性用于指定在JNDI查找失败后,是否使用动态目的地,默认值是false。JndiDestinationResolver的cache属性用于指定是否对目的地实例进行缓存,默认值是true。

 

1.3 BeanFactoryDestinationResolver 
    BeanFactoryDestinationResolver实现了DestinationResolver接口和BeanFactoryAware接口。它会根据指定的目的地名从BeanFactory中查找目的地实例。以下是相关的代码:

Java代码  收藏代码
  1. public Destination resolveDestinationName(Session session, String destinationName,   
  2. boolean pubSubDomain) throws JMSException {  
  3.     Assert.state(this.beanFactory != null"BeanFactory is required");  
  4.     try {  
  5.         return (Destination) this.beanFactory.getBean(destinationName, Destination.class);  
  6.     }  
  7.     catch (BeansException ex) {  
  8.         throw new DestinationResolutionException(  
  9.             "Failed to look up Destinaton bean with name '" + destinationName + "'", ex);  
  10.     }  
  11. }  

 

2 JmsAccessor 
    抽象类JmsAccessor是JmsTemplate、SimpleMessageListenerContainer和DefaultMessageListenerContainer等concrete class的基类。JmsAccessor定义了如下几个用于访问JMS服务的共通属性。

Java代码  收藏代码
  1. private ConnectionFactory connectionFactory;  
  2. private boolean sessionTransacted = false;  
  3. private int sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE;  


    JmsAccessor提供了创建Connection和Session的方法,如下:

Java代码  收藏代码
  1. protected Connection createConnection() throws JMSException {  
  2.     return getConnectionFactory().createConnection();  
  3. }  
  4.   
  5. protected Session createSession(Connection con) throws JMSException {  
  6.     return con.createSession(isSessionTransacted(), getSessionAcknowledgeMode());  
  7. }  

 
2.1 JmsDestinationAccessor 
    抽象类JmsDestinationAccessor继承自JmsAccessor,增加了destinationResolver和pubSubDomain属性。destinationResolver的默认值是DynamicDestinationResolver的实例,也就是说默认采用动态目的地解析的方式;pubSubDomain用于指定是使用“发布/订阅”模式还是使用“点对点”模式,默认值是false。

    JmsDestinationAccessor提供了用于解析目的地的方法,如下:

Java代码  收藏代码
  1. protected Destination resolveDestinationName(Session session, String destinationName)   
  2. throws JMSException {  
  3.     return getDestinationResolver().resolveDestinationName(session, destinationName,   
  4.         isPubSubDomain());  
  5. }  


2.2 AbstractJmsListeningContainer 
    AbstractJmsListeningContainer继承自JmsDestinationAccessor,作为所有Message Listener Container的公共基类。它主要提供了JMS connection的生命周期管理的功能,但是没有对消息接收的方式(主动接收方式或者异步接收方式)等做任何假定。该类主要的属性如下:

Java代码  收藏代码
  1. private String clientId;  
  2. private Connection sharedConnection;  

    clientId通常用于持久订阅;sharedConnection保存了被共享的JMS connection。

 

    该类定义了如下的抽象方法,以便子类可以决定是否使用共享的JMS connection。

Java代码  收藏代码
  1. protected abstract boolean sharedConnectionEnabled();  

 

2.3 AbstractMessageListenerContainer 
    AbstractMessageListenerContainer继承自AbstractJmsListeningContainer,也是作为所有Message Listener Container的公共基类。该类主要的属性如下:

Java代码  收藏代码
  1. private volatile Object destination;  
  2. private volatile Object messageListener;  
  3. private boolean exposeListenerSession = true;  

    destination用于指定接收消息的目的地。
    messageListener用于指定处理消息的listener。对于messageListener,它既可以是符合JMS规范的javax.jms.MessageListener,也可以是Spring特有的org.springframework.jms.listener.SessionAwareMessageListener。SessionAwareMessageListener的定义如下:

Java代码  收藏代码
  1. public interface SessionAwareMessageListener {  
  2.     void onMessage(Message message, Session session) throws JMSException;  
  3. }  

    跟javax.jms.MessageListener相比,这个接口的onMessage方法增加了一个Session 类型的参数,可以通过这个session发送回复消息(reply message)。


    如果使用了SessionAwareMessageListener 类型的message listener,那么exposeListenerSession参数指定了传入onMessage方法的session参数是否是创建了MessageConsumer的session,默认值是true。如果是false,那么AbstractMessageListenerContainer会在connection上新建一个session,并传入onMessage方法。

2.4 AbstractPollingMessageListenerContainer 
    AbstractPollingMessageListenerContainer继承自AbstractMessageListenerContainer,它提供了对于主动接收消息(polling)的支持,以及支持外部的事务管理。

Java代码  收藏代码
  1. private boolean pubSubNoLocal = false;  
  2. private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;  
  3. private PlatformTransactionManager transactionManager;  

    如果使用“发布/订阅”模式,那么pubSubNoLocal 属性指定通过某个连接发送到某个Topic的消息,是否应该被投递回这个连接。


    receiveTimeout属性用于指定调用MessageConsumer的receive方法时的超时时间,默认值是1秒。需要注意的是,这个值应该比transactionManager 中指定的事务超时时间略小。


    通常情况下,应该为transactionManager设置一个org.springframework.transaction.jta.JtaTransactionManager的实例,此外也要设置一个支持XA的ConnectionFactory。需要注意的是,XA 事务对性能有较大的影响。
    如果只是希望使用local JMS transaction,那么只要设置sessionTransacted为true或者使用JmsTransactionManager即可。实际上,如果设置了非JTA的transactionManager,那么sessionTransacted属性会自动被设置成true。
    由于local JMS transaction无法同其它local transaction(例如local database transaction)进行协调,因此客户端程序可能需要对重发的消息进行检查。JMS规范要求:JMS provider应该将重发消息的JMSRedelivered属性设置为true。

2.5 SimpleMessageListenerContainer 
    SimpleMessageListenerContainer继承自AbstractMessageListenerContainer,使用异步方式接收消息(也就是通过MessageConsumer上注册MessageListener的方式接收消息)。该类主要的属性如下:

Java代码  收藏代码
  1. private boolean pubSubNoLocal = false;  
  2. private int concurrentConsumers = 1;  
  3. private Set sessions;  
  4. private Set consumers;  
  5. private TaskExecutor taskExecutor;  

    如果使用“发布/订阅”模式,那么pubSubNoLocal 属性指定通过某个连接发送到某个Topic的消息,是否应该被投递回这个连接。

 

    SimpleMessageListenerContainer允许创建多个Session和MessageConsumer来接收消息。具体的个数由concurrentConsumers属性指定。需要注意的是,应该只是在Destination为Queue的时候才使用多个MessageConsumer(Queue中的一个消息只能被一个Consumer接收),虽然使用多个MessageConsumer会提高消息处理的性能,但是消息处理的顺序却得不到保证:消息被接收的顺序仍然是消息发送时的顺序,但是由于消息可能会被并发处理,因此消息处理的顺序可能和消息发送的顺序不同。此外,不应该在Destination为Topic的时候使用多个MessageConsumer,这是因为多个MessageConsumer会接收到同样的消息。

    SimpleMessageListenerContainer创建的Session和MessageConsumer分别保存在sessions和consumers属性中。


    taskExecutor属性的默认值是null,也就是说,对MessageListener(或者SessionAwareMessageListener)的回调是在MessageConsumer的内部线程中执行。如果指定了taskExecutor,那么回调是在TaskExecutor内部的线程中执行。以下是相关的代码:

Java代码  收藏代码
  1. protected MessageConsumer createListenerConsumer(final Session session)   
  2. throws JMSException {  
  3.     Destination destination = getDestination();  
  4.     if (destination == null) {  
  5.         destination = resolveDestinationName(session, getDestinationName());  
  6.     }  
  7.     MessageConsumer consumer = createConsumer(session, destination);  
  8.   
  9.     if (this.taskExecutor != null) {  
  10.         consumer.setMessageListener(new MessageListener() {  
  11.             public void onMessage(final Message message) {  
  12.                 taskExecutor.execute(new Runnable() {  
  13.                     public void run() {  
  14.                         processMessage(message, session);  
  15.                     }  
  16.                 });  
  17.             }  
  18.         });  
  19.     }  
  20.     else {  
  21.         consumer.setMessageListener(new MessageListener() {  
  22.             public void onMessage(Message message) {  
  23.                 processMessage(message, session);  
  24.             }  
  25.         });  
  26.     }  
  27.   
  28.     return consumer;  
  29. }  

    需要注意的是,如果指定了taskExecutor,那么消息在被taskExecutor内部的线程处理前,可能已经被确认过了(外层的onMessage方法可能已经执行结束了)。因此如果使用事务Session或者Session.CLIENT_ACKNOWLEDGE类型的确认模式,那么可能会导致问题。

 

    该类的sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定义)总是返回true,因此SimpleMessageListenerContainer会使用共享的JMS connection。

2.6 DefaultMessageListenerContainer 
    DefaultMessageListenerContainer继承自AbstractPollingMessageListenerContainer,主要使用同步的方式接收消息(也就是通过循环调用MessageConsumer.receive的方式接收消息)。该类主要的属性如下:

Java代码  收藏代码
  1. private int concurrentConsumers = 1;  
  2. private int maxConcurrentConsumers = 1;  
  3. private int maxMessagesPerTask = Integer.MIN_VALUE;  
  4. private int idleTaskExecutionLimit = 1;  
  5. private final Set scheduledInvokers = new HashSet();  
  6. private TaskExecutor taskExecutor;  
  7. private int cacheLevel = CACHE_AUTO;  

    跟SimpleMessageListenerContainer一样,DefaultMessageListenerContainer也支持创建多个Session和MessageConsumer来接收消息。跟SimpleMessageListenerContainer不同的是,DefaultMessageListenerContainer创建了concurrentConsumers所指定个数的AsyncMessageListenerInvoker(实现了SchedulingAwareRunnable接口),并交给taskExecutor运行。


    maxMessagesPerTask属性的默认值是Integer.MIN_VALUE,但是如果设置的taskExecutor(默认值是SimpleAsyncTaskExecutor)实现了SchedulingTaskExecutor接口并且其prefersShortLivedTasks方法返回true(也就是说该TaskExecutor倾向于短期任务),那么maxMessagesPerTask属性会自动被设置为10。
    如果maxMessagesPerTask属性的值小于0,那么AsyncMessageListenerInvoker.run方法会在循环中反复尝试接收消息,并在接收到消息后调用MessageListener(或者SessionAwareMessageListener);如果maxMessagesPerTask属性的值不小于0,那么AsyncMessageListenerInvoker.run方法里最多会尝试接收消息maxMessagesPerTask次,每次接收消息的超时时间由其父类AbstractPollingMessageListenerContainer的receiveTimeout属性指定。如果在这些尝试中都没有接收到消息,那么AsyncMessageListenerInvoker的idleTaskExecutionCount属性会被累加。在run方法执行完毕前会对idleTaskExecutionCount进行检查,如果该值超过了DefaultMessageListenerContainer.idleTaskExecutionLimit(默认值1),那么这个AsyncMessageListenerInvoker可能会被销毁。


    所有AsyncMessageListenerInvoker实例都保存在scheduledInvokers中,实例的个数可以在concurrentConsumers和maxConcurrentConsumers之间浮动。跟SimpleMessageListenerContainer一样,应该只是在Destination为Queue的时候才使用多个AsyncMessageListenerInvoker实例。

 

    cacheLevel属性用于指定是否对JMS资源进行缓存,可选的值是CACHE_NONE = 0、CACHE_CONNECTION = 1、CACHE_SESSION = 2、CACHE_CONSUMER = 3和CACHE_AUTO = 4。默认情况下,如果transactionManager属性不为null,那么cacheLevel被自动设置为CACHE_NONE(不进行缓存),否则cacheLevel被自动设置为CACHE_CONSUMER。


    如果cacheLevel属性值大于等于CACHE_CONNECTION,那么sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定义)返回true,也就是说使用共享的JMS连接。

 

 

3 SingleConnectionFactory 
    SingleConnectionFactory实现了ConnectionFactory接口,其createConnection方法总是返回相同的Connection。可以在SingleConnectionFactory的构造函数中传入Connection对象或者ConnectionFactory对象,用来创建被代理的连接对象。SingleConnectionFactory.createConnection方法返回的连接是个代理,它忽略了对stop和close方法的调用(连接会在SingleConnectionFactory.destroy方法中关闭)。


    SingleConnectionFactory的reconnectOnException属性用来指定是否在连接抛出JMSException的时候,对连接进行重置,重置后如果再调用createConnection方法,那么会返回一个新的连接。


    需要注意的是,AbstractJmsListeningContainer类的抽象方法sharedConnectionEnabled指定了是否在message listener container内部使用共享的JMS连接。因此通常情况下不需要为单独的message listener container设置SingleConnectionFactory(及其子类);如果希望在不同的message listener container之间共享JMS连接,那么可以考虑使用SingleConnectionFactory。

3.1 CachingConnectionFactory 
    CachingConnectionFactory继承自SingleConnectionFactory,增加了对Session和MessageProducer缓存的功能。该类主要的属性如下:

Java代码  收藏代码
  1. private int sessionCacheSize = 1;  
  2. private boolean cacheProducers = true;  

    sessionCacheSize属性指定了被缓存的Session实例的个数(默认值是1),也就是说,如果同时请求的Session个数大于sessionCacheSize,那么这些Session不会被缓存,而是正常的被创建和销毁。


    cacheProducers属性指定了是否对MessageProducer进行缓存,默认值是true。

 出自:http://whitesock.iteye.com/blog/306776

分享到:
评论

相关推荐

    spring_jms

    在项目结构上,Spring JMS应用通常包括以下几个关键部分: 1. `pom.xml`:Maven的项目对象模型文件,定义了项目的依赖和构建过程。 2. `applicationContext.xml`:Spring的配置文件,定义了bean的定义和它们之间的...

    Java网络编程--基于Spring的JMS编程

    Spring框架对JMS的支持主要体现在以下几个方面: 1. **Spring JMS抽象层**:Spring提供了一个高级的API,它抽象了JMS的底层细节,使得开发者可以专注于业务逻辑而不是消息系统的复杂性。Spring的`JmsTemplate`是这...

    spring-jms源码

    在源码分析中,我们可以关注以下几个关键点: 1. 消息的生产和消费:Spring JmsTemplate是如何将对象转换成消息并发送的,以及如何从消息中恢复对象。这涉及到MessageConverter的实现,如SimpleMessageConverter或...

    spring-jms-4.3.4.RELEASE.zip

    首先,Spring JMS是Spring Framework中的一个模块,它提供了一种简洁、面向对象的方式来访问JMS API,简化了消息生产者和消费者的实现。在Spring的配置文件中,开发者可以轻松定义消息模板、监听容器和消息驱动的...

    Spring集成JMS

    在Spring框架中,集成JMS主要包括以下几个关键组件: 1. **`ConnectionFactory`**:这是JMS的核心接口,用于创建与消息服务器的连接。Spring提供了一个抽象的`JmsConnectionFactory`,可以配置各种JMS供应商的实现...

    Spring JMS使异步消息变得简单.doc

    Spring JMS通过提供`JmsTemplate`类,将这些繁琐的步骤封装起来,开发者只需要调用几个简单的API就可以发送和接收消息,极大地提高了开发效率。 使用Spring JMS的一个关键优势在于其灵活性。`JmsTemplate`允许你...

    Spring整合JMS(三)——MessageConverter介绍

    MessageConverter是Spring JMS中的一个重要组件,它的主要职责是将Java对象与JMS消息类型(如TextMessage或ObjectMessage)之间进行转换。Spring默认提供了几种常见的MessageConverter,如SimpleMessageConverter和...

    JMS之Spring +activeMQ实现消息队列

    ActiveMQ的配置通常包括以下几个步骤: 1. **安装ActiveMQ**: 下载并运行ActiveMQ服务器,确保其正常启动。ActiveMQ默认提供一个基于Web的管理界面,可以通过浏览器访问来监控和管理消息队列。 2. **Spring配置**:...

    spring+jms+activemq

    配置JMS与ActiveMQ的连接通常涉及以下几个步骤: 1. 引入依赖:在项目中添加Spring和ActiveMQ的相关依赖库,这通常通过Maven或Gradle的配置文件完成。 2. 配置ActiveMQ服务器:设置ActiveMQ服务器的地址和端口,...

    activemq +jms(原生和集成spring-jms)

    在"activemq + jms(原生和集成spring-jms)"的主题中,我们将探讨如何使用ActiveMQ原生API以及结合Spring-JMS框架来实现消息队列的创建与使用,主要涵盖以下几个核心知识点: 1. **ActiveMQ的基本概念**:包括Broker...

    jms Spring+ActiveMQ 5.4.2

    通常,这样的内容会涉及以下几个关键知识点: 1. **Spring JMS模块**:Spring框架提供了JMS模块,使得在Spring应用中集成JMS变得简单。它提供了Template和Container抽象,用于发送和接收消息。 2. **配置ActiveMQ*...

    Spring整合Blazeds实现ActiveMQ JMS消息服务

    这个过程涉及到几个关键的技术组件和概念,下面将详细阐述。 首先,Spring框架是Java开发中最常用的应用框架之一,它提供了依赖注入(Dependency Injection, DI)和面向切面编程(Aspect-Oriented Programming, AOP...

    Spring 实现远程访问详解——jms和activemq

    前几章我们分别利用spring rmi、httpinvoker、httpclient、webservice技术实现不同服务器间的远程访问。本章我将通过spring jms和activemq实现单Web项目服务器间异步访问和多Web项目服务器间异步访问。 一. 简介 1. ...

    JMS-Spring

    配置主要涉及以下几个关键组件: 1. **ConnectionFactory**: 这是连接到JMS提供者(如ActiveMQ)的工厂类。在上述示例中,我们使用`org.apache.activemq.ActiveMQConnectionFactory`,并设置`brokerURL`属性指向...

    spring2 activemq5 tomcat6构建jms

    标题“spring2 activemq5 tomcat6构建jms”涉及了几个关键的Java技术,主要集中在企业级应用开发中的消息传递系统。首先,我们来深入理解这些技术及其相互关系。 Spring框架是Java应用程序开发的一个核心工具,尤其...

    Spring+JMS+ActiveMQ+Tomcat实现消息服务_服务器应用

    Spring 的 JMS 支持主要包括以下几个方面: - **JmsTemplate**:Spring 提供了一个名为 `JmsTemplate` 的类,该类封装了 JMS API 的复杂性,简化了消息的发送和接收。通过 `JmsTemplate`,开发者可以很容易地实现...

    JMS_ActiveMQ_Spring.rar

    在Spring中,ActiveMQ的配置通常包含以下几个关键部分: 1. JMS连接工厂配置: ```xml ``` 2. 创建目的地: ```xml ``` 3. 配置MessageListener容器: ```xml <bean id="jmsContainer" class="org.spring...

    jms+activeMq+spring学习简单例子

    虽然无法直接查看这些内容,但通常这样的文章会涵盖以下几个方面: 1. **JMS基础**:JMS允许应用程序创建、发送、接收和阅读消息。它定义了两种消息模型:点对点(Queue)和发布/订阅(Topic)。在点对点模型中,每...

Global site tag (gtag.js) - Google Analytics