Spring JMS可以帮助开发人员快速的使用MQ的发送与接收,在异步接收方面,Spring 提供了MessageListenerContainer的容器接收消息。通过研究源码发现DefaultMessageListenerContainer是支持动态改变messageSelector的。在DefaultMessageListenerContainer 中有个cacheLevel的属性默认是4,把它改动到2或1或0,数字分表代表
public static final int CACHE_NONE = 0;
public static final int CACHE_CONNECTION = 1;
public static final int CACHE_SESSION = 2;
public static final int CACHE_CONSUMER = 3;
public static final int CACHE_AUTO = 4;
在设置完cacheLevel后就可以动态设置messageSelector,Container就能用上最新的selector了。
Spring配置如下
<bean id="messageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="receiverQueue" /> <property name="messageListener" ref="jmsReceiver" /> <property name="concurrentConsumers" value="10" /> <property name="messageSelector" value="CLIENT='DEMO'" /> <property name="cacheLevel" value="2"/> </bean>
修改messageSelector代码如下
DefaultMessageListenerContainer messageListenerContainer = (DefaultMessageListenerContainer) ac.getBean("messageListenerContainer"); messageListenerContainer.setMessageSelector("CLIENT='DEMO2'");
源码分析:
//DefaultMessageListenerContainer类中, 每次收消息都会call的方法 private boolean invokeListener() throws JMSException { initResourcesIfNecessary(); boolean messageReceived = DefaultMessageListenerContainer.this.receiveAndExecute(this, this.session, this.consumer); this.lastMessageSucceeded = true; return messageReceived; } //这里就是使用cacheLevel的地方,由于需要动态selector,所以需要每次重新生成consumer,//当cacheLevel<3的时候,this.consumer会为null private void initResourcesIfNecessary() throws JMSException { if (DefaultMessageListenerContainer.this.getCacheLevel() <= 1) { updateRecoveryMarker(); } else { if ((this.session == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 2)) { updateRecoveryMarker(); this.session = DefaultMessageListenerContainer.this.createSession(DefaultMessageListenerContainer.this.getSharedConnection()); } if ((this.consumer == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 3)) this.consumer = DefaultMessageListenerContainer.this.createListenerConsumer(this.session); } } //在这个方法中可以发现当传入consumer为null时,会生成一个新的consumer protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status) throws JMSException { Connection conToClose = null; Session sessionToClose = null; MessageConsumer consumerToClose = null; try { Session sessionToUse = session; boolean transactional = false; if (sessionToUse == null) { sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( getConnectionFactory(), this.transactionalResourceFactory, true); transactional = sessionToUse != null; } if (sessionToUse == null) { Connection conToUse = null; if (sharedConnectionEnabled()) { conToUse = getSharedConnection(); } else { conToUse = createConnection(); conToClose = conToUse; conToUse.start(); } sessionToUse = createSession(conToUse); sessionToClose = sessionToUse; } MessageConsumer consumerToUse = consumer; if (consumerToUse == null) { consumerToUse = createListenerConsumer(sessionToUse); consumerToClose = consumerToUse; } Message message = receiveMessage(consumerToUse); if (message != null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" + sessionToUse + "]"); } messageReceived(invoker, sessionToUse); boolean exposeResource = (!(transactional)) && (isExposeListenerSession()) && (!(TransactionSynchronizationManager.hasResource(getConnectionFactory()))); if (exposeResource) TransactionSynchronizationManager.bindResource( getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse)); try { doExecuteListener(sessionToUse, message); } catch (Throwable ex) { if (status != null) { if (this.logger.isDebugEnabled()) { this.logger.debug("Rolling back transaction because of listener exception thrown: " + ex); } status.setRollbackOnly(); } handleListenerException(ex); if (ex instanceof JMSException) throw ((JMSException)ex); } finally { if (exposeResource) TransactionSynchronizationManager.unbindResource(getConnectionFactory()); } return true; } if (this.logger.isTraceEnabled()) { this.logger.trace("Consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" + sessionToUse + "] did not receive a message"); } noMessageReceived(invoker, sessionToUse); return false; } finally { JmsUtils.closeMessageConsumer(consumerToClose); JmsUtils.closeSession(sessionToClose); ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true); } }
相关推荐
在Spring中,这可以通过`@JmsListener`注解的`subscriptionDurable`属性实现,或者在`DefaultMessageListenerContainer`中设置`subscriptionDurable`属性。 6. **JMSConsumer** 压缩包中的`jmsconsumer`可能是一个...
在发送端,我们可以设置消息的属性,以便在消费端的Selector可以使用这些属性进行过滤。例如,如果我们想要根据消息的某个自定义属性“channel”进行定向发送,可以这样做: ```java Message msg = new Message(...
3. **消息过滤**: 通过JMS的MessageSelector特性,可以筛选出符合特定条件的消息进行消费。 4. **消息确认**: 有自动确认和手动确认两种模式,手动确认允许在消费者确认接收消息后再提交事务,增强消息可靠性。 ...
如果是Topic,使用createSubscriber()方法,并提供一个可选的MessageSelector来过滤消息。 6. 使用Consumer的receive()方法接收消息,或者在回调中监听消息到达事件。 7. 处理接收到的消息,并关闭Consumer和...
- **配置文件**:主要的配置文件是`conf/activemq.xml`,用户可以在此修改消息传输、持久化、安全等设置。 - **创建队列和主题**:通过管理控制台或者编程方式创建JMS队列和主题。 - **生产者与消费者**:编写应用...
前言 本文主要给大家介绍了关于Laravel本地化模块的相关内容,分享出来供大家参考学习,话不多说了,来一起看看详细的介绍吧。 本文是基于Laravel 5.4版本的本地化模块代码进行分析书写;... MessageSelector 消息
- **定义目的地**:在模块中添加消息队列或主题,设置其属性如名称、存储类型等。 - **配置JMS服务器**:定义JMS服务器,指定存储区和连接工厂。 - **创建连接工厂**:定义客户端如何连接到JMS服务器,配置连接...
在代码中,这可能是通过 `MessageConsumer.setMessageListener()` 方法的 `MessageSelector` 参数实现的。 7. **性能与优化**:示例可能涵盖了如批量发送、预取策略等性能优化技巧。 通过深入研究这个示例项目,...