`
kooii
  • 浏览: 38618 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

动态设置Spring DefaultMessageListenerContainer 的messageSelector

阅读更多

Spring JMS可以帮助开发人员快速的使用MQ的发送与接收,在异步接收方面,Spring 提供了MessageListenerContainer的容器接收消息。通过研究源码发现DefaultMessageListenerContainer是支持动态改变messageSelector的。在DefaultMessageListenerContainer 中有个cacheLevel的属性默认是4,把它改动到2或1或0,数字分表代表

public static final int CACHE_NONE = 0;
public static final int CACHE_CONNECTION = 1;
public static final int CACHE_SESSION = 2;
public static final int CACHE_CONSUMER = 3;
public static final int CACHE_AUTO = 4;

在设置完cacheLevel后就可以动态设置messageSelector,Container就能用上最新的selector了。

Spring配置如下

<bean id="messageListenerContainer"
        class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
        <property name="connectionFactory" ref="jmsConnectionFactory" />  
        <property name="destination" ref="receiverQueue" />  
        <property name="messageListener" ref="jmsReceiver" />  
        <property name="concurrentConsumers" value="10" />           
        <property name="messageSelector" value="CLIENT='DEMO'" />  
        <property name="cacheLevel" value="2"/>
    </bean> 

 修改messageSelector代码如下

DefaultMessageListenerContainer messageListenerContainer = (DefaultMessageListenerContainer) ac.getBean("messageListenerContainer");
		messageListenerContainer.setMessageSelector("CLIENT='DEMO2'");

 

源码分析:

//DefaultMessageListenerContainer类中, 每次收消息都会call的方法
private boolean invokeListener() throws JMSException {
initResourcesIfNecessary();
boolean messageReceived = DefaultMessageListenerContainer.this.receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
 }

//这里就是使用cacheLevel的地方,由于需要动态selector,所以需要每次重新生成consumer,//当cacheLevel<3的时候,this.consumer会为null
private void initResourcesIfNecessary() throws JMSException {
       if (DefaultMessageListenerContainer.this.getCacheLevel() <= 1) {
         updateRecoveryMarker();
       }
       else {
         if ((this.session == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 2)) {
           updateRecoveryMarker();
           this.session = DefaultMessageListenerContainer.this.createSession(DefaultMessageListenerContainer.this.getSharedConnection());
         }
         if ((this.consumer == null) && (DefaultMessageListenerContainer.this.getCacheLevel() >= 3))
           this.consumer = DefaultMessageListenerContainer.this.createListenerConsumer(this.session);
       }
     }


//在这个方法中可以发现当传入consumer为null时,会生成一个新的consumer
protected boolean doReceiveAndExecute(Object invoker, Session session, MessageConsumer consumer, TransactionStatus status)
     throws JMSException
   {
     Connection conToClose = null;
     Session sessionToClose = null;
     MessageConsumer consumerToClose = null;
     try {
       Session sessionToUse = session;
       boolean transactional = false;
       if (sessionToUse == null) {
         sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession(
           getConnectionFactory(), this.transactionalResourceFactory, true);
         transactional = sessionToUse != null;
       }
       if (sessionToUse == null) {
         Connection conToUse = null;
         if (sharedConnectionEnabled()) {
           conToUse = getSharedConnection();
         }
         else {
           conToUse = createConnection();
           conToClose = conToUse;
           conToUse.start();
         }
         sessionToUse = createSession(conToUse);
         sessionToClose = sessionToUse;
       }
       MessageConsumer consumerToUse = consumer;
       if (consumerToUse == null) {
         consumerToUse = createListenerConsumer(sessionToUse);
         consumerToClose = consumerToUse;
       }
       Message message = receiveMessage(consumerToUse);
       if (message != null) {
         if (this.logger.isDebugEnabled()) {
           this.logger.debug("Received message of type [" + message.getClass() + "] from consumer [" + 
             consumerToUse + "] of " + ((transactional) ? "transactional " : "") + "session [" + 
             sessionToUse + "]");
         }
         messageReceived(invoker, sessionToUse);
         boolean exposeResource = (!(transactional)) && (isExposeListenerSession()) && 
           (!(TransactionSynchronizationManager.hasResource(getConnectionFactory())));
         if (exposeResource)
           TransactionSynchronizationManager.bindResource(
             getConnectionFactory(), new LocallyExposedJmsResourceHolder(sessionToUse));
         try
         {
           doExecuteListener(sessionToUse, message);
         }
         catch (Throwable ex) {
           if (status != null) {
             if (this.logger.isDebugEnabled()) {
               this.logger.debug("Rolling back transaction because of listener exception thrown: " + ex);
             }
             status.setRollbackOnly();
           }
           handleListenerException(ex);
 
           if (ex instanceof JMSException)
             throw ((JMSException)ex);
         }
         finally
         {
           if (exposeResource)
             TransactionSynchronizationManager.unbindResource(getConnectionFactory());
         }
         return true;
       }
 
       if (this.logger.isTraceEnabled()) {
         this.logger.trace("Consumer [" + consumerToUse + "] of " + ((transactional) ? "transactional " : "") + 
           "session [" + sessionToUse + "] did not receive a message");
       }
       noMessageReceived(invoker, sessionToUse);
       return false;
     }
     finally
     {
       JmsUtils.closeMessageConsumer(consumerToClose);
       JmsUtils.closeSession(sessionToClose);
       ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), true);
     }
   }

 

 

分享到:
评论

相关推荐

    spring下queue与持久订阅topic实现

    在Spring中,这可以通过`@JmsListener`注解的`subscriptionDurable`属性实现,或者在`DefaultMessageListenerContainer`中设置`subscriptionDurable`属性。 6. **JMSConsumer** 压缩包中的`jmsconsumer`可能是一个...

    RocketMQ自定义selector实现消息通道定向发送和拉取

    在发送端,我们可以设置消息的属性,以便在消费端的Selector可以使用这些属性进行过滤。例如,如果我们想要根据消息的某个自定义属性“channel”进行定向发送,可以这样做: ```java Message msg = new Message(...

    ActiveMQ 开发

    3. **消息过滤**: 通过JMS的MessageSelector特性,可以筛选出符合特定条件的消息进行消费。 4. **消息确认**: 有自动确认和手动确认两种模式,手动确认允许在消费者确认接收消息后再提交事务,增强消息可靠性。 ...

    activeMQ生产者和消费者代码

    如果是Topic,使用createSubscriber()方法,并提供一个可选的MessageSelector来过滤消息。 6. 使用Consumer的receive()方法接收消息,或者在回调中监听消息到达事件。 7. 处理接收到的消息,并关闭Consumer和...

    apache-activemq-5.15.3-bin

    - **配置文件**:主要的配置文件是`conf/activemq.xml`,用户可以在此修改消息传输、持久化、安全等设置。 - **创建队列和主题**:通过管理控制台或者编程方式创建JMS队列和主题。 - **生产者与消费者**:编写应用...

    Laravel学习教程之本地化模块

    前言 本文主要给大家介绍了关于Laravel本地化模块的相关内容,分享出来供大家参考学习,话不多说了,来一起看看详细的介绍吧。 本文是基于Laravel 5.4版本的本地化模块代码进行分析书写;... MessageSelector 消息

    weblogic中配置JMS及其测试程序

    - **定义目的地**:在模块中添加消息队列或主题,设置其属性如名称、存储类型等。 - **配置JMS服务器**:定义JMS服务器,指定存储区和连接工厂。 - **创建连接工厂**:定义客户端如何连接到JMS服务器,配置连接...

    activemq-demo:activemq-演示

    在代码中,这可能是通过 `MessageConsumer.setMessageListener()` 方法的 `MessageSelector` 参数实现的。 7. **性能与优化**:示例可能涵盖了如批量发送、预取策略等性能优化技巧。 通过深入研究这个示例项目,...

Global site tag (gtag.js) - Google Analytics