`

深入理解DefaultMessageListenerContainer

阅读更多

一.DefaultMessageListenerContainer概述

        DefaultMessageListenerContainer是一个用于异步消息监听的管理类。

        DefaultMessageListenerContainer最简单的实现逻辑,一个任务执行器,执行任务(即消息监听)。

DefaultMessageListenerContainer实现的主要原理是,通过内部初始化建立的一个taskExecutor(默认是SimpleAsyncTaskExecutor)用于执行消息监听的任务(AsyncMessageListenerInvoker)。

        这里默认的任务执行器是SimpleAsyncTaskExecutor,这个执行器的缺点是不会重用连接,也就是对于每个任务都需要新开启一个线程,执行完任务后会关闭它。如果要优化的话可以考虑线程池。

        消息监听的任务被抽象成AsyncMessageListenerInvoker类,这个类实现了Runnable接口,内部run方法其实是通过不断循环consumer.recieve()方法来实现监听。

        事实上一个消费者对应了一个AsyncMessageListenerInvoker任务,每个任务需要一个单独的线程去执行它。这个AsyncMessageListenerInvoker实例被放在了一个名为scheduledInvokers的set里面。

 

二.DefaultMessageListenerContainer中的connection,session,consumer

        其实我们还有一个比较关心的地方是这个DefaultMessageListenerContainer缓不缓存connection、session、consumer。它是根据catchLevel属性来决定是否缓存connection、session、consumer。默认的catchLevel对应常量CATCH_AUTO,即由配置的外部事务管理器决定。catchLevel级别分别是CATCH_NONE,CATCH_CONNECTION,CATCH_SESSION,CATCH_CONSUMER,分别对应0,1,2,3。我试了下默认的CATCH_AUTO在没有定义事务管理时值为 CATCH_CONSUMER,即3。具体查看类中的方法:

/**
 * Constant that indicates to cache no JMS resources at all.
 * @see #setCacheLevel
 */
public static final int CACHE_NONE = 0;

/**
 * Constant that indicates to cache a shared JMS {@code Connection} for each
 * listener thread.
 * @see #setCacheLevel
 */
public static final int CACHE_CONNECTION = 1;

/**
 * Constant that indicates to cache a shared JMS {@code Connection} and a JMS
 * {@code Session} for each listener thread.
 * @see #setCacheLevel
 */
public static final int CACHE_SESSION = 2;

/**
 * Constant that indicates to cache a shared JMS {@code Connection}, a JMS
 * {@code Session}, and a JMS MessageConsumer for each listener thread.
 * @see #setCacheLevel
 */
public static final int CACHE_CONSUMER = 3;

/**
 * Constant that indicates automatic choice of an appropriate caching level
 * (depending on the transaction management strategy).
 * @see #setCacheLevel
 */
public static final int CACHE_AUTO = 4;
@Override
public void initialize() {
	// Adapt default cache level.
	if (this.cacheLevel == CACHE_AUTO) {
		this.cacheLevel = (getTransactionManager() != null ? CACHE_NONE : CACHE_CONSUMER);
	}

	// Prepare taskExecutor and maxMessagesPerTask.
	synchronized (this.lifecycleMonitor) {
		if (this.taskExecutor == null) {
			this.taskExecutor = createDefaultTaskExecutor();
		}
		else if (this.taskExecutor instanceof SchedulingTaskExecutor &&
				((SchedulingTaskExecutor) this.taskExecutor).prefersShortLivedTasks() &&
				this.maxMessagesPerTask == Integer.MIN_VALUE) {
			// TaskExecutor indicated a preference for short-lived tasks. According to
			// setMaxMessagesPerTask javadoc, we'll use 10 message per task in this case
			// unless the user specified a custom value.
			this.maxMessagesPerTask = 10;
		}
	}

	// Proceed with actual listener initialization.
	super.initialize();
}

        DefaultMessageListenerContainer会根据catchLevel来缓存共享connection、session及consumer,值为3的话就会缓存connection、session及consumer,在初始化的时候就会调用父类AbstractJmsListeningContainer的doStart()方法,判断cacheLevel是否大于等于1,如果大于就创建一个connection将放入成员变量sharedConnection中。

        每个任务被执行的时候(即责任是监听消息),会先去获取connection、session及consumer(通过调用initResourcesIfNecessary方法)就像我们自己最初实现一个简单的客户端消费者一样。只不过这里会根据catchLevel来决定是否缓存session及consumer,被缓存了的session及consumer放在对应的成员变量里面。

        接着任务会想要执行consumer.recieve方法,这之前肯定要获取onnection、session及consumer,如果已有connection、session及consumer则获取过来,如果没有则通过配置的信息新建。

        执行完consumer.recieve后,会判断consumer.recieve返回的消息是否为空,不为空则调用message对应的messageListner(之前我们在DefaultMessageListenerContainer中通过方法setMessageListner设置的)的onMessage执行相应的逻辑,并设置这个任务的Idle为false,表明这个任务不是空闲的,然后会调用方法判断是否应该新建任务实例,这个受限于MaxConcurrentConsumers及IdleTaskExecutionLimit。为空则不需要特别处理,只需调用noMessageReceived方法将idle标记设为true。

        任务执行完后,会在finally处释放connection,session及consumer。这个是根据上述讲的catchLevel来设置的。

        继承体系如下:


        AbstractJmsListeningContainer提供了一个最上层最基础的jms消息监听管理类所应该有的方法。提供了start(启动这个管理类)、stop、initialize(初始化这个管理类)、establishSharedConnection等。

 

三.DefaultMessageListenerContainer类的主要属性

        DefaultMessageListenerContainer继承自AbstractPollingMessageListenerContainer,主要使用同步的方式接收消息(也就是通过循环调用MessageConsumer.receive的方式接收消息)。

        DefaultMessageListenerContainer类主要的属性如下:

private Executor taskExecutor;

private long recoveryInterval = DEFAULT_RECOVERY_INTERVAL;

private int cacheLevel = CACHE_AUTO;

private int concurrentConsumers = 1;

private int maxConcurrentConsumers = 1;

private int maxMessagesPerTask = Integer.MIN_VALUE;

private int idleConsumerLimit = 1;

private int idleTaskExecutionLimit = 1;

private final Set<AsyncMessageListenerInvoker> scheduledInvokers = new HashSet<AsyncMessageListenerInvoker>();

        跟SimpleMessageListenerContainer一样,DefaultMessageListenerContainer也支持创建多个Session和MessageConsumer来接收消息。跟SimpleMessageListenerContainer不同的是,DefaultMessageListenerContainer创建了concurrentConsumers所指定个数的AsyncMessageListenerInvoker(实现了SchedulingAwareRunnable接口),并交给taskExecutor运行。

        maxMessagesPerTask属性的默认值是Integer.MIN_VALUE,但是如果设置的taskExecutor(默认值是SimpleAsyncTaskExecutor)实现了SchedulingTaskExecutor接口并且其prefersShortLivedTasks方法返回true(也就是说该TaskExecutor倾向于短期任务),那么maxMessagesPerTask属性会自动被设置为10。

        如果maxMessagesPerTask属性的值小于0,那么AsyncMessageListenerInvoker.run方法会在循环中反复尝试接收消息,并在接收到消息后调用MessageListener(或者SessionAwareMessageListener);如果maxMessagesPerTask属性的值不小于0,那么AsyncMessageListenerInvoker.run方法里最多会尝试接收消息maxMessagesPerTask次,每次接收消息的超时时间由其父类AbstractPollingMessageListenerContainer的receiveTimeout属性指定。如果在这些尝试中都没有接收到消息,那么AsyncMessageListenerInvoker的idleTaskExecutionCount属性会被累加。在run方法执行完毕前会对idleTaskExecutionCount进行检查,如果该值超过了DefaultMessageListenerContainer.idleTaskExecutionLimit(默认值1),那么这个AsyncMessageListenerInvoker可能会被销毁。

        所有AsyncMessageListenerInvoker实例都保存在scheduledInvokers中,实例的个数可以在concurrentConsumers和maxConcurrentConsumers之间浮动。跟SimpleMessageListenerContainer一样,应该只是在Destination为Queue的时候才使用多个AsyncMessageListenerInvoker实例。

        cacheLevel属性用于指定是否对JMS资源进行缓存,可选的值是CACHE_NONE = 0、CACHE_CONNECTION = 1、CACHE_SESSION = 2、CACHE_CONSUMER = 3和CACHE_AUTO = 4。默认情况下,如果transactionManager属性不为null,那么cacheLevel被自动设置为CACHE_NONE(不进行缓存),否则cacheLevel被自动设置为CACHE_CONSUMER。

        如果cacheLevel属性值大于等于CACHE_CONNECTION,那么sharedConnectionEnabled方法(在AbstractJmsListeningContainer中定义)返回true,也就是说使用共享的JMS连接。

 

四.设置messageSelector

        DefaultMessageListenerContainer是支持动态改变messageSelector的,可以动态设置messageSelector,Container就能用上最新的selector了。

        Spring配置如下:

<bean id="messageListenerContainer"
	class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
	<property name="connectionFactory" ref="jmsConnectionFactory" />  
	<property name="destination" ref="receiverQueue" />  
	<property name="messageListener" ref="jmsReceiver" />  
	<property name="concurrentConsumers" value="10" />           
	<property name="messageSelector" value="CLIENT='DEMO'" />  
	<property name="cacheLevel" value="2"/>
</bean>

        修改messageSelector代码如下:

DefaultMessageListenerContainer messageListenerContainer = (DefaultMessageListenerContainer) ac.getBean("messageListenerContainer");
messageListenerContainer.setMessageSelector("CLIENT='DEMO2'");

 

五.源码分析

        1.DefaultMessageListenerContainer类中的invokeListener方法,每次收消息都会调用。

private boolean invokeListener() throws JMSException {
	initResourcesIfNecessary();
	boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
	this.lastMessageSucceeded = true;
	return messageReceived;
}
/**
 * Runnable that performs looped {@code MessageConsumer.receive()} calls.
 */
private class AsyncMessageListenerInvoker implements SchedulingAwareRunnable {

	private Session session;

	private MessageConsumer consumer;

	private Object lastRecoveryMarker;

	private boolean lastMessageSucceeded;

	private int idleTaskExecutionCount = 0;

	private volatile boolean idle = true;

	@Override
	public void run() {
		synchronized (lifecycleMonitor) {
			activeInvokerCount++;
			lifecycleMonitor.notifyAll();
		}
		boolean messageReceived = false;
		try {
			if (maxMessagesPerTask < 0) {
				messageReceived = executeOngoingLoop();
			}
			else {
				int messageCount = 0;
				while (isRunning() && messageCount < maxMessagesPerTask) {
					messageReceived = (invokeListener() || messageReceived);
					messageCount++;
				}
			}
		}
		catch (Throwable ex) {
			clearResources();
			if (!this.lastMessageSucceeded) {
				// We failed more than once in a row - sleep for recovery interval
				// even before first recovery attempt.
				sleepInbetweenRecoveryAttempts();
			}

        2.如下initResourcesIfNecessary方法中使用cacheLevel的地方,由于需要动态selector,所以需要每次重新生成consumer,当cacheLevel<3的时候,this.consumer会为null。

private void initResourcesIfNecessary() throws JMSException {
	if (getCacheLevel() <= CACHE_CONNECTION) {
		updateRecoveryMarker();
	}
	else {
		if (this.session == null && getCacheLevel() >= CACHE_SESSION) {
			updateRecoveryMarker();
			this.session = createSession(getSharedConnection());
		}
		if (this.consumer == null && getCacheLevel() >= CACHE_CONSUMER) {
			this.consumer = createListenerConsumer(this.session);
			synchronized (lifecycleMonitor) {
				registeredWithDestination++;
			}
		}
	}
}

         3.在AbstractPollingMessageListenerContainer的doReceiveAndExecute方法中可以发现当传入consumer为null时,会生成一个新的consumer。

/**
 * Actually execute the listener for a message received from the given consumer,
 * fetching all requires resources and invoking the listener.
 * @param session the JMS Session to work on
 * @param consumer the MessageConsumer to work on
 * @param status the TransactionStatus (may be {@code null})
 * @return whether a message has been received
 * @throws JMSException if thrown by JMS methods
 * @see #doExecuteListener(javax.jms.Session, javax.jms.Message)
 */
protected boolean doReceiveAndExecute(
		Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
		throws JMSException {

	Connection conToClose = null;
	Session sessionToClose = null;
	MessageConsumer consumerToClose = null;
	try {
		Session sessionToUse = session;
		boolean transactional = false;
		if (sessionToUse == null) {
			sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
					getConnectionFactory(), this.transactionalResourceFactory, true);
			transactional = (sessionToUse != null);
		}
		if (sessionToUse == null) {
			Connection conToUse;
			if (sharedConnectionEnabled()) {
				conToUse = getSharedConnection();
			}
			else {
				conToUse = createConnection();
				conToClose = conToUse;
				conToUse.start();
			}
			sessionToUse = createSession(conToUse);
			sessionToClose = sessionToUse;
		}
		MessageConsumer consumerToUse = consumer;
		if (consumerToUse == null) {
			consumerToUse = createListenerConsumer(sessionToUse);
			consumerToClose = consumerToUse;
		}
		Message message = receiveMessage(consumerToUse);
		if (message != null) {
			if (logger.isDebugEnabled()) {
				logger.debug("Received message of type [" + message.getClass() + "] from consumer [" +
						consumerToUse + "] of " + (transactional ? "transactional " : "") + "session [" +
						sessionToUse + "]");
			}
			messageReceived(invoker, sessionToUse);
			boolean exposeResource = (!transactional && isExposeListenerSession() &&
					!TransactionSynchronizationManager.hasResource(getConnectionFactory()));
			if (exposeResource) {
				TransactionSynchronizationManager.bindResource(
						getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
			}
			try {
				doExecuteListener(sessionToUse, message);
			}
			catch (Throwable ex) {
				if (status != null) {
					if (logger.isDebugEnabled()) {
						logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
					}
					status.setRollbackOnly();
				}
				handleListenerException(ex);
				// Rethrow JMSException to indicate an infrastructure problem
				// that may have to trigger recovery...
				if (ex instanceof JMSException) {
					throw (JMSException) ex;
				}
			}
			finally {
				if (exposeResource) {
					TransactionSynchronizationManager.unbindResource(getConnectionFactory());
				}
			}
			// Indicate that a message has been received.
			return true;
		}
		else {
			if (logger.isTraceEnabled()) {
				logger.trace("Consumer [" + consumerToUse + "] of " + (transactional ? "transactional " : "") +
						"session [" + sessionToUse + "] did not receive a message");
			}
			noMessageReceived(invoker, sessionToUse);
			// Nevertheless call commit, in order to reset the transaction timeout (if any).
			// However, don't do this on Tibco since this may lead to a deadlock there.
			if (shouldCommitAfterNoMessageReceived(sessionToUse)) {
				commitIfNecessary(sessionToUse, message);
			}
			// Indicate that no message has been received.
			return false;
		}
	}
	finally {
		JmsUtils.closeMessageConsumer(consumerToClose);
		JmsUtils.closeSession(sessionToClose);
		ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
	}
}

 

文章来源:http://blog.csdn.net/caolaosanahnu/article/details/12096577

http://kooii.iteye.com/blog/1913272

  • 大小: 37.4 KB
分享到:
评论

相关推荐

    ActiveMQ-P2P文本消息

    在IT行业中,消息传递是分布式系统中至关重要的一个部分,它允许不同的组件之间进行异步通信,从而提高系统的可扩展性和容错性。...通过深入理解和实践,开发者能够构建出高效、可靠的异步通信解决方案。

    spring下queue与持久订阅topic实现

    在Spring框架中,消息传递是实现解耦和异步处理的关键技术。本篇文章将深入探讨如何在Spring环境下...在实践中,结合源码阅读和使用相关工具,能够更深入地理解Spring JMS的工作原理,从而更好地优化和调试我们的应用。

    Spring In Action 使用Spring发送和接收JMS消息

    《Spring In Action:使用Spring发送和...通过源码分析和实践,我们可以更深入地理解Spring JMS的工作流程,提升系统设计和开发能力。对于使用Java的企业级应用,掌握Spring JMS是提高系统可扩展性和稳定性的重要技能。

    activemq-demo

    通过学习和研究这个"activemq-demo"项目,你可以深入理解如何在Spring环境中集成和使用ActiveMQ,这对于构建大规模、高并发的分布式系统至关重要。此外,这也能帮助你掌握消息中间件在解决系统解耦、异步处理和容错...

    Spring整合JMS(四)——事务管理

    本文将深入探讨如何在Spring中整合JMS并实现事务管理,以确保消息传递的可靠性和一致性。 首先,我们需要理解Spring的声明式事务管理。Spring提供了一种强大的声明式事务管理机制,它允许我们在不编写任何事务控制...

    activemq-spring-1.4.jar.zip

    《ActiveMQ与Spring整合——深度解析activemq-spring-1.4.jar》 在Java世界里,消息中间件扮演着至关重要的角色,它能够帮助应用程序解耦...通过深入理解并实践这些概念,你将能够构建出稳定、高效的分布式消息系统。

    ActiveMQSpringDemo

    通过Spring的`JmsTemplate`和`DefaultMessageListenerContainer`等工具,我们可以方便地发送和接收消息,而无需深入理解底层的JMS细节。 ### 2. 配置ActiveMQ 首先,我们需要在项目中引入ActiveMQ的依赖。这通常在...

    spring-kms-test源码

    接下来,我们将深入探讨Spring JMS Test的源码,以便更好地理解和运用它。 首先,我们需要了解Spring JMS Test的核心类`AbstractJmsTemplateTestSupport`。这个类为测试场景提供了基础的JMS模板和容器配置,它继承...

    jms+sping+activeMq的例子serd和recevice

    这意味着我们将深入理解如何在Spring框架中集成JMS和ActiveMQ,以实现消息的发送与接收功能。 **JMS(Java Message Service)** JMS是一种标准的API,允许Java应用程序创建、发送、接收和读取消息。它提供了异步...

    spring-jms-4.3.4.RELEASE.zip

    《Spring JMS 4.3.4.RELEASE:开源项目的深度解析》 Spring Framework作为Java领域...对于想深入理解Spring JMS的开发者,可以从源码spring-framework-master入手,通过阅读和实践,进一步掌握其工作原理和使用技巧。

    spring-framework-2.5.6jar包

    在Spring 2.5.6中,你可以利用MessageDrivenPojo和DefaultMessageListenerContainer来处理消息。 最后,Spring的测试模块提供了方便的单元测试和集成测试工具,如MockMVC和TestContext框架,帮助开发者编写和执行...

    activemq学习(2) spring+activemq

    下面我们将深入探讨这一主题。 首先,我们需要理解ActiveMQ的基本概念。ActiveMQ是Apache软件基金会的一个项目,它实现了Java Message Service (JMS) 规范,提供了消息持久化、传输可靠性和网络集群等功能。它支持...

    spring+activemq

    下面我们将深入讨论这个主题。 首先,**Spring对JMS的支持**:Spring框架提供了一套强大的JMS抽象层,简化了与消息中间件如ActiveMQ的交互。通过使用`ConnectionFactory`和`MessageListenerContainer`,开发者可以...

    ActiveMq发布和订阅消息的实现源码

    本篇文章将深入探讨ActiveMQ的发布/订阅模型(Publish/Subscribe)的实现源码,以及如何与Spring框架进行集成。 首先,我们需要理解ActiveMQ中的发布/订阅模式。在这个模型中,生产者(Publisher)发送消息到一个...

    activemq-spring-1.5.jar.zip

    3. **监听容器**:Spring的`DefaultMessageListenerContainer`允许在后台线程池中自动启动和停止消息监听器,这使得消息的处理完全异步,从而提高系统的响应速度。 4. **事务支持**:Spring的事务管理能力与...

    spring+jms+activemq

    本文将深入探讨如何使用Spring框架与ActiveMQ结合实现基于JMS的消息传递系统。 首先,理解JMS的基本概念至关重要。JMS定义了生产者(Producer)、消费者(Consumer)和消息队列(Queue)或主题(Topic)等核心组件...

    ActiveMQ与spring集成实例

    为了更深入地理解ActiveMQ与Spring的集成,你可以参考给定的博文链接,那里可能有更多实战案例和详细解释。同时,不断学习和实践,熟悉JMS(Java Message Service)规范,对于提升消息中间件的使用技巧至关重要。

    am_spring_consumer:ActiveMQ 消费者实例

    在IT行业中,消息队列(Message Queue)是一种重要的中间件技术,它允许应用程序之间通过异步通信进行数据交换。...通过深入理解和实践,我们可以更好地利用ActiveMQ提供的功能,提高系统的可扩展性和可靠性。

    activeMq代码记录(jmsspring+jmstest)

    首先,让我们深入了解JMS。Java Message Service(JMS)是Java平台上的一个标准API,用于在分布式环境中发送、接收和管理消息。JMS允许应用程序之间进行异步通信,提高系统的可扩展性和解耦性。ActiveMQ作为JMS的一...

Global site tag (gtag.js) - Google Analytics