`

spring jms

阅读更多
1 DestinationResolver
    DestinationResolver接口的作用是将指定的目的地名解析为目的地实例。其定义如下:
public interface DestinationResolver {   
    Destination resolveDestinationName(Session session, String destinationName,    
        boolean pubSubDomain) throws JMSException;   
}  

    参数pubSubDomain用于指定是使用“发布/订阅”模式(解析后的目的地是Topic),还是使用“点对点”模式(解析后的目的地是Queue)。

    CachingDestinationResolver接口继承了DestinationResolver,增加了缓存的功能,其接口定义如下:

public interface CachingDestinationResolver extends DestinationResolver {   
    void removeFromCache(String destinationName);   
    void clearCache();   
}  
 

在目的地失效的时候,removeFromCache方法会被调用;在JMS provider失效的时候,clearCache方法会被调用。

1.1 DynamicDestinationResolver
    DynamicDestinationResolver实现了DestinationResolver接口。根据指定的目的地名,DynamicDestinationResolver会动态创建目的地实例。针对JMS1.1规范,它采用如下方法创建目的地:
session.createTopic(topicName)   
session.createQueue(queueName);  

1.2 JndiDestinationResolver
    JndiDestinationResolver继承自JndiLocatorSupport, 同时实现了CachingDestinationResolver接口。如果在JMS provider中配置了静态目的地,那么JndiDestinationResolver通过JNDI查找的方式获得目的地实例。


    JndiDestinationResolver的fallbackToDynamicDestination属性用于指定在JNDI查找失败后,是否使用动态目的地,默认值是false。JndiDestinationResolver的cache属性用于指定是否对目的地实例进行缓存,默认值是true。



1.3 BeanFactoryDestinationResolver
    BeanFactoryDestinationResolver实现了DestinationResolver接口和BeanFactoryAware接口。它会根据指定的目的地名从BeanFactory中查找目的地实例。以下是相关的代码:
public Destination resolveDestinationName(Session session, String destinationName,    
boolean pubSubDomain) throws JMSException {   
    Assert.state(this.beanFactory != null, "BeanFactory is required");   
    try {   
        return (Destination) this.beanFactory.getBean(destinationName, Destination.class);   
    }   
    catch (BeansException ex) {   
        throw new DestinationResolutionException(   
            "Failed to look up Destinaton bean with name '" + destinationName + "'", ex);   
    }   
}  

2 JmsAccessor
    抽象类JmsAccessor是JmsTemplate、SimpleMessageListenerContainer和DefaultMessageListenerContainer等concrete class的基类。JmsAccessor定义了如下几个用于访问JMS服务的共通属性。
private ConnectionFactory connectionFactory;   
private boolean sessionTransacted = false;   
private int sessionAcknowledgeMode = Session.AUTO_ACKNOWLEDGE; 

JmsAccessor提供了创建Connection和Session的方法,如下:
protected Connection createConnection() throws JMSException {   
    return getConnectionFactory().createConnection();   
}   
  
protected Session createSession(Connection con) throws JMSException {   
    return con.createSession(isSessionTransacted(), getSessionAcknowledgeMode());   
}  
2.1 JmsDestinationAccessor
    抽象类JmsDestinationAccessor继承自JmsAccessor,增加了destinationResolver和pubSubDomain属性。destinationResolver的默认值是DynamicDestinationResolver的实例,也就是说默认采用动态目的地解析的方式;pubSubDomain用于指定是使用“发布/订阅”模式还是使用“点对点”模式,默认值是false。

    JmsDestinationAccessor提供了用于解析目的地的方法,如下:

protected Destination resolveDestinationName(Session session, String destinationName)    
throws JMSException {   
    return getDestinationResolver().resolveDestinationName(session, destinationName,    
        isPubSubDomain());   
}  

2.2 AbstractJmsListeningContainer
    AbstractJmsListeningContainer继承自JmsDestinationAccessor,作为所有Message Listener Container的公共基类。它主要提供了JMS connection的生命周期管理的功能,但是没有对消息接收的方式(主动接收方式或者异步接收方式)等做任何假定。该类主要的属性如下:
private String clientId;   
private Connection sharedConnection;     

clientId通常用于持久订阅;sharedConnection保存了被共享的JMS connection。

    该类定义了如下的抽象方法,以便子类可以决定是否使用共享的JMS connection。

Java代码
protected abstract boolean sharedConnectionEnabled(); 


2.3 AbstractMessageListenerContainer
    AbstractMessageListenerContainer继承自AbstractJmsListeningContainer,也是作为所有Message Listener Container的公共基类。该类主要的属性如下:

Java代码
private volatile Object destination;   
private volatile Object messageListener;   
private boolean exposeListenerSession = true;  
   
destination用于指定接收消息的目的地。
    messageListener用于指定处理消息的listener。对于messageListener,它既可以是符合JMS规范的javax.jms.MessageListener,也可以是Spring特有的org.springframework.jms.listener.SessionAwareMessageListener。SessionAwareMessageListener的定义如下:

Java代码
public interface SessionAwareMessageListener {   
    void onMessage(Message message, Session session) throws JMSException;   
}

    跟javax.jms.MessageListener相比,这个接口的onMessage方法增加了一个Session 类型的参数,可以通过这个session发送回复消息(reply message)。


    如果使用了SessionAwareMessageListener 类型的message listener,那么exposeListenerSession参数指定了传入onMessage方法的session参数是否是创建了MessageConsumer的session,默认值是true。如果是false,那么AbstractMessageListenerContainer会在connection上新建一个session,并传入onMessage方法。

2.4 AbstractPollingMessageListenerContainer
    AbstractPollingMessageListenerContainer继承自AbstractMessageListenerContainer,它提供了对于主动接收消息(polling)的支持,以及支持外部的事务管理。

Java代码
private boolean pubSubNoLocal = false;   
private long receiveTimeout = DEFAULT_RECEIVE_TIMEOUT;   
private PlatformTransactionManager transactionManager;
 
如果使用“发布/订阅”模式,那么pubSubNoLocal 属性指定通过某个连接发送到某个Topic的消息,是否应该被投递回这个连接。


    receiveTimeout属性用于指定调用MessageConsumer的receive方法时的超时时间,默认值是1秒。需要注意的是,这个值应该比transactionManager 中指定的事务超时时间略小。


    通常情况下,应该为transactionManager设置一个org.springframework.transaction.jta.JtaTransactionManager的实例,此外也要设置一个支持XA的ConnectionFactory。需要注意的是,XA 事务对性能有较大的影响。
    如果只是希望使用local JMS transaction,那么只要设置sessionTransacted为true或者使用JmsTransactionManager即可。实际上,如果设置了非JTA的transactionManager,那么sessionTransacted属性会自动被设置成true。
    由于local JMS transaction无法同其它local transaction(例如local database transaction)进行协调,因此客户端程序可能需要对重发的消息进行检查。JMS规范要求:JMS provider应该将重发消息的JMSRedelivered属性设置为true。

2.5 SimpleMessageListenerContainer
    SimpleMessageListenerContainer继承自AbstractMessageListenerContainer,使用异步方式接收消息(也就是通过MessageConsumer上注册MessageListener的方式接收消息)。该类主要的属性如下:

Java代码
private boolean pubSubNoLocal = false;   
private int concurrentConsumers = 1;   
private Set sessions;   
private Set consumers;   
private TaskExecutor taskExecutor;

    如果使用“发布/订阅”模式,那么pubSubNoLocal 属性指定通过某个连接发送到某个Topic的消息,是否应该被投递回这个连接。



    SimpleMessageListenerContainer允许创建多个Session和MessageConsumer来接收消息。具体的个数由concurrentConsumers属性指定。需要注意的是,应该只是在Destination为Queue的时候才使用多个MessageConsumer(Queue中的一个消息只能被一个Consumer接收),虽然使用多个MessageConsumer会提高消息处理的性能,但是消息处理的顺序却得不到保证:消息被接收的顺序仍然是消息发送时的顺序,但是由于消息可能会被并发处理,因此消息处理的顺序可能和消息发送的顺序不同。此外,不应该在Destination为Topic的时候使用多个MessageConsumer,这是因为多个MessageConsumer会接收到同样的消息。

    SimpleMessageListenerContainer创建的Session和MessageConsumer分别保存在sessions和consumers属性中。


    taskExecutor属性的默认值是null,也就是说,对MessageListener(或者SessionAwareMessageListener)的回调是在MessageConsumer的内部线程中执行。如果指定了taskExecutor,那么回调是在TaskExecutor内部的线程中执行。以下是相关的代码:

Java代码
protected MessageConsumer createListenerConsumer(final Session session)    
throws JMSException {   
    Destination destination = getDestination();   
    if (destination == null) {   
        destination = resolveDestinationName(session, getDestinationName());   
    }   
    MessageConsumer consumer = createConsumer(session, destination);   
  
    if (this.taskExecutor != null) {   
        consumer.setMessageListener(new MessageListener() {   
            public void onMessage(final Message message) {   
                taskExecutor.execute(new Runnable() {   
                    public void run() {   
                        processMessage(message, session);   
                    }   
                });   
            }   
        });   
    }   
    else {   
        consumer.setMessageListener(new MessageListener() {   
            public void onMessage(Message message) {   
                processMessage(message, session);   
            }   
        });   
    }   
  
    return consumer;   
}
   
需要注意的是,如果指定了taskExecutor,那么消息在被taskExecutor内部的线程处理前,可能已经被确认过了(外层的onMessage方法可能已经执行结束了)。因此如果使用事务Session或者Session.CLIENT_ACKNOWLEDGE类型的确认模式,那么可能会导致问题。



    该类的sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定义)总是返回true,因此SimpleMessageListenerContainer会使用共享的JMS connection。

2.6 DefaultMessageListenerContainer
    DefaultMessageListenerContainer继承自AbstractPollingMessageListenerContainer,主要使用同步的方式接收消息(也就是通过循环调用MessageConsumer.receive的方式接收消息)。该类主要的属性如下:

Java代码
private int concurrentConsumers = 1;   
private int maxConcurrentConsumers = 1;   
private int maxMessagesPerTask = Integer.MIN_VALUE;   
private int idleTaskExecutionLimit = 1;   
private final Set scheduledInvokers = new HashSet();   
private TaskExecutor taskExecutor;   
private int cacheLevel = CACHE_AUTO; 
 
跟SimpleMessageListenerContainer一样,DefaultMessageListenerContainer也支持创建多个Session和MessageConsumer来接收消息。跟SimpleMessageListenerContainer不同的是,DefaultMessageListenerContainer创建了concurrentConsumers所指定个数的AsyncMessageListenerInvoker(实现了SchedulingAwareRunnable接口),并交给taskExecutor运行。


    maxMessagesPerTask属性的默认值是Integer.MIN_VALUE,但是如果设置的taskExecutor(默认值是SimpleAsyncTaskExecutor)实现了SchedulingTaskExecutor接口并且其prefersShortLivedTasks方法返回true(也就是说该TaskExecutor倾向于短期任务),那么maxMessagesPerTask属性会自动被设置为10。
    如果maxMessagesPerTask属性的值小于0,那么AsyncMessageListenerInvoker.run方法会在循环中反复尝试接收消息,并在接收到消息后调用MessageListener(或者SessionAwareMessageListener);如果maxMessagesPerTask属性的值不小于0,那么AsyncMessageListenerInvoker.run方法里最多会尝试接收消息maxMessagesPerTask次,每次接收消息的超时时间由其父类AbstractPollingMessageListenerContainer的receiveTimeout属性指定。如果在这些尝试中都没有接收到消息,那么AsyncMessageListenerInvoker的idleTaskExecutionCount属性会被累加。在run方法执行完毕前会对idleTaskExecutionCount进行检查,如果该值超过了DefaultMessageListenerContainer.idleTaskExecutionLimit(默认值1),那么这个AsyncMessageListenerInvoker可能会被销毁。


    所有AsyncMessageListenerInvoker实例都保存在scheduledInvokers中,实例的个数可以在concurrentConsumers和maxConcurrentConsumers之间浮动。跟SimpleMessageListenerContainer一样,应该只是在Destination为Queue的时候才使用多个AsyncMessageListenerInvoker实例。



    cacheLevel属性用于指定是否对JMS资源进行缓存,可选的值是CACHE_NONE = 0、CACHE_CONNECTION = 1、CACHE_SESSION = 2、CACHE_CONSUMER = 3和CACHE_AUTO = 4。默认情况下,如果transactionManager属性不为null,那么cacheLevel被自动设置为CACHE_NONE(不进行缓存),否则cacheLevel被自动设置为CACHE_CONSUMER。


    如果cacheLevel属性值大于等于CACHE_CONNECTION,那么sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定义)返回true,也就是说使用共享的JMS连接。





3 SingleConnectionFactory
    SingleConnectionFactory实现了ConnectionFactory接口,其createConnection方法总是返回相同的Connection。可以在SingleConnectionFactory的构造函数中传入Connection对象或者ConnectionFactory对象,用来创建被代理的连接对象。SingleConnectionFactory.createConnection方法返回的连接是个代理,它忽略了对stop和close方法的调用(连接会在SingleConnectionFactory.destroy方法中关闭)。


    SingleConnectionFactory的reconnectOnException属性用来指定是否在连接抛出JMSException的时候,对连接进行重置,重置后如果再调用createConnection方法,那么会返回一个新的连接。


    需要注意的是,AbstractJmsListeningContainer类的抽象方法sharedConnectionEnabled指定了是否在message listener container内部使用共享的JMS连接。因此通常情况下不需要为单独的message listener container设置SingleConnectionFactory(及其子类);如果希望在不同的message listener container之间共享JMS连接,那么可以考虑使用SingleConnectionFactory。

3.1 CachingConnectionFactory
    CachingConnectionFactory继承自SingleConnectionFactory,增加了对Session和MessageProducer缓存的功能。该类主要的属性如下:

Java代码
private int sessionCacheSize = 1;   
private boolean cacheProducers = true; 

sessionCacheSize属性指定了被缓存的Session实例的个数(默认值是1),也就是说,如果同时请求的Session个数大于sessionCacheSize,那么这些Session不会被缓存,而是正常的被创建和销毁。


    cacheProducers属性指定了是否对MessageProducer进行缓存,默认值是true。

分享到:
评论

相关推荐

    SpringJMS示例代码

    SpringJMS是Spring框架的一部分,它提供了一种与Java消息服务(JMS)进行交互的简单方式。在本文中,我们将深入探讨SpringJMS的基本概念、如何与ActiveMQ集成,以及如何通过示例代码理解其工作原理。 1. **Spring...

    SpringJMS整合ActiveMQ

    详细内容: SpringJMS整合ActiveMQ.doc 详细说明文档 apache-activemq-5.8.0-bin.zip ActiveMQ安装包 JMSTest.rar MyEclipse8.5下web工程

    Spring JMS

    Spring JMS Spring JMS 是一个基于 Java 消息服务(JMS)的框架,它提供了一个简洁的方式来使用 JMS API。Spring JMS 框架提供了一个抽象层,简化了 JMS 的使用,使得开发者可以更容易地使用 JMS。 Spring JMS 的...

    Spring JMS使异步消息变得简单.doc

    【Spring JMS】是Spring框架中的一个模块,用于简化Java消息服务(JMS)的使用,使得异步消息处理变得更加简单和灵活。Spring JMS通过提供一个模板类,抽象了JMS API的底层细节,让开发者能够更加专注于消息的处理...

    springjms的demo

    Spring对JMS提供了很好的支持,可以通过JmsTemplate来方便地实现消息服务。本例通过activeMQ服务器模拟了消息的发送与接收。需要注意的是,activeMQ的运行依赖jdk的环境,而且对jdk的版本也有要求,我用的是jdk1.6+...

    spring_jms

    Spring JMS(Java Message Service)是Spring框架的一部分,专门用于集成JMS消息传递系统,以实现异步通信和解耦应用程序组件。在这个入门级实例中,我们将探讨如何使用Maven、Spring和ActiveMQ来构建一个简单的...

    Spring JMS 消息处理-基于JNDI

    这篇博客“Spring JMS 消息处理-基于JNDI”将深入探讨如何在Spring应用中使用JMS进行消息处理,并利用JNDI(Java Naming and Directory Interface)来查找和配置消息资源。 JMS是Java平台上的一个标准接口,它定义...

    Java网络编程--基于Spring的JMS编程

    在Java中,Spring框架提供了强大而灵活的支持,使得开发人员能够轻松地实现基于Java消息服务(JMS)的网络编程。JMS是一个标准接口,它允许应用程序创建、发送、接收和读取消息,这些消息可以在不同的应用之间传递,...

    spring jms 整合 weblogic jms

    本人开发的spring jms项目,已经上线近一年了,这里是我项目jms配置文件,使用的是spring jms 整合weblogic jms。如果真的需要,请咨询我,并且附上我上传的这个配置文件,附近中没有带有这个文件,一律不作任何回答...

    Spring JMS消息处理-不基于JNDI

    在本文中,我们将深入探讨Spring框架中的Java消息服务(JMS)支持,特别是不依赖于JNDI(Java Naming and Directory Interface)的方式。Spring JMS为应用程序提供了与消息中间件交互的能力,允许我们构建可扩展且...

    使用Spring JMS轻松实现异步消息传递.pdf

    【Spring JMS 知识点详解】 Spring JMS 是 Spring 框架的一部分,它提供了一种简单且灵活的方式来处理 Java 消息服务 (JMS)。在面向服务架构 (SOA) 中,异步消息传递是关键组件,特别是在企业级系统间通信,特别是...

    spring jms tomcat 异步消息传递入门实例

    在IT行业中,Spring框架是Java应用开发的基石,它提供了丰富的功能来简化应用程序的构建,而JMS(Java Message Service)则是处理异步通信的一种标准API。Tomcat作为轻量级的应用服务器,常用于部署Spring应用。在这...

    JMS整合Spring实例

    **JMS整合Spring实例** Java消息服务(Java Message Service,简称JMS)是Java平台中用于企业级应用间异步通信的一种标准接口。它允许应用程序创建、发送、接收和读取消息,以此来解耦生产者和消费者。而Spring框架...

    jms整合spring工程

    本项目"jms整合spring工程"是一个已经准备就绪的Java工程,它展示了如何在Spring框架中集成JMS,以便利用消息队列进行通信。主要依赖的是Apache ActiveMQ,这是一款流行的开源JMS提供者,能够实现高效、可靠的实时...

    SpringJMS源码

    **Spring JMS源码分析** 在Java世界里,消息队列(JMS,Java Message Service)是一种标准,它定义了一种规范,使得不同的消息中间件提供商可以为开发者提供一致性的API,以实现在应用程序间传递消息。Spring JMS是...

    Spring集成JMS

    在实际应用中,`SpringJMS`可能包含以下示例代码片段: ```xml <bean id="jmsConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <!-- 这里配置具体的JMS提供商实现 -->...

Global site tag (gtag.js) - Google Analytics