`
QING____
  • 浏览: 2251312 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

ActiveMQ中Consumer特性详解与优化

 
阅读更多

前言

    从本文中你可以了解到如下内容:

    1) consumer端消息消费的模型,session的运作机制

    2) 如果提升broker和consumer端消息消费的速率

    3) selector,group,exclusive对消息消费的影响

    4) 如何让Priority更好的运行,提高消息的顺序性

    5) Slow Consumer的产生原因,以及如何调优。

 

 

    Consumer作为ActiveMQ的消费端,开起来简单,不过还有很多隐藏的特性,值得我们去探索和调优。

    如下为典型的Consumer端代码示例

String brokerUrl = "tcp://localhost:61616?"
String queueName = "test-queue";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
ActiveMQConnection connection = (ActiveMQConnection)factory.createConnection();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
	@Override
	public void onMessage(Message message) {
		try{
			ActiveMQMessage m = (ActiveMQMessage)message;
			System.out.println(m.toString());
		}catch(Exception e){
			//
			e.printStackTrace();
		}
	}
});
connection.start();

//connection.close();

 

    Consumer端涉及到众多的特性,其中包括:消息转发机制,session机制与多线程,消息ACK策略等,我们无法详解所有的原理,如下为Consumer端消息转发机制,仅供参考,[详细请参考]



 

    上图非常复杂,具体的原理,我们将会在下文中逐个解释。大概原理与过程:

 

  • 通过Connection实例创建Session之后,将会把session实例保存在本地的list中,即connection持有session列表,并在底层开启transport,侦听数据。
  • Session创建Consumer之后,将会把Consumer实例添加到本地的list中,以便此后分拣消息,即session持有consumer列表;此外connection中也持有一个consumer集合(Map),其中Key为consumerId,value为session引用。
  • 如果Session支持异步转发(asyncDispatch)或者使用了转发池(dispatchPool),将创建线程池用来转发消息。
  • 当broker端有消息通过transport发送时,connection将会分拣消息,根据消息中指定的consumerId,从本地session列表中获取其对应的session实例;然后将消息交付session负责转发。
  • 当session接受到消息后,会根据消息的consumerId,在本地consumer列表中找到对应的consumer实例;检测session的消息转发方式,如果是同步转发,则直接将消息交给consumer,即调用consumer.dispatch(message),此方法要么调用messageListener.onMessage(),要么将消息添加到本地的unconsumedMessages队列中(唤醒receive);如果是异步转发,则将消息添加到session级别的队列中并由线程池负责转发。
  • 当consumer接受到消息之后,将会调用messageListener.onMessage方法或者从receive方法中返回。
  • 当消息消费成功后,consumer将会根据指定的ACK_MODE负责向broker发送ACK指令,此后消息将会在broker端清除。

    (broker运行期间,当Consumer加入时,broker将会根据Consumer消费的Destination和相关配置信息,创建Queue或者Topic实例(Broker端,如果已经创建,则直接将Conusmer注册在Queue的消费者列表中),每个Destination实例都会引用Store实例,且Queue和Topic本身是一个Task,将会被运行在Broker的线程池中;这意味着Queue和Topic将可以根据Consumers的能力,并发的从Store中获取属于自己的消息列表,并转发给Consumers;消息从Store中Pagin到Queue内部的buffer列表中,并逐个遍历Consumers将消息转发;如果Consumers为异步转发,则添加到Consumer内部的buffer中,并通过Consumer内部的线程从buffer中移除,通过底层Transport发送) 

1. asyncDispatch

    broker端是否允许使用“异步转发”。broker端处理connection、session、consumer的模型和Client端几乎一样;当通道中(Queue/Topic)有消息需要转发给consumer时,将会调用相应的Subscription的dispatch方法(在broker端负责与Consumer通讯的服务,称为Subscription,每个Consumer客户端对应broker端一个Subscription实例);如果asyncDispatch为false,那么将会阻塞转发线程(dispatchThread),直到底层的Transport将消息发送到Consumer客户端(如果底层transport正在发送消息,意味着阻塞);如果为true,则会将消息添加到transport本地的队列中(负责与Consumer客户端通信的每个transport都有一个本地的queue和线程池),并启动线程池负责发送消息,那么转发线程即可立即返回。(每个Consumer一个异步转发线程)

    (参见代码:QueueStoreCursor/TopicStoreCursor --> broker端Queue/Topic(核心类)--->QueuePrefetchSubscription,QueueRegion,AbstractRegion,RoundRobinDispatchPolicy/StrictOrderDispatchPolicy)

    

    由此可见,当前通道中所有的consumer都在brorker端开启了“异步转发”,可以大大提升broker转发消息的性能,此值默认为true。

 

//在brokerUrl中设置
tcp://localhost:61616?jms.dispatchAsync=true
//在destinationUri中为当前通道设定
orderQueue?consumer.dispatchAsync=true

 

     broker“异步转发”采用的是线程池模式,线程池的缺点即使当任务单元运行时间足够短时,线程切换的成本将变得很重要,这也意味着,如果通道中(Queue/Topic)中的consumer客户端接收和消费消息的速度很快,我们可以关闭异步转发来避免线程切换。如果consumer通常为慢速(Slower Consumer),且关闭了异步转发,那就意味着每次主线程转发消息时,都需要阻塞较长的时间,这极大的影响了整体的通讯效率。无论如何,我们都没有理由关闭异步转发,异步转发应该是最佳选择。

 

    备注:broker端Queue/Topic(例如:org.apache.activemq.broker.region.Queue)实例,负责从底层store中获取消息,并使用线程池模式转发消息,dispatchPolicy负责将消息转发给相应的subscription,当Consumer客户端创建时,会向Broker发送一个指令,这个指令中包括consumerId(参看ConsumerInfo),那么此后Region(在逻辑上相当于客户端的Session,但每个通道只有一个实例)将会为每个ConsumerId创建相应Subscription实例(持有底层Transport,buffer队列、异步发送的线程),那么此后Consumer与Broker的数据交互,就有Subscription负责。每个Region都维护了一个Subscription列表,用于dispatchPolicy转发消息。这个设计模型,和Consumer客户端 + Session非常相似。

 

2. alwaysSessionAsync

    客户端session是否使用异步转发,它和(1)中的参数没有任何关系,当底层Transport接收到Broker发送的消息后,会交付给session,那么session是否采用异步的方式将消息传递给Consumer!此值默认为true。

    如果为false,表示使用同步。当consumer使用receive()获取消息时,那么session将会把消息添加到consumer的本地queue,然后唤醒receive等待;当consumer使用messageListener异步侦听消息时,将会调用其onMessage()方法直到方法执行完毕,然后返回。

    如果为true,表示使用异步。session接受的消息,将会首先放入session buffer中(队列),那么此后的线程池将会负责移除buffer中的消息,并转发给相应的consumer。

 

    当session中有多个consumer时,或者Transport中消息量比较密集,异步方式是最佳的。如果session中只有一个consumer,或者transport中消息量很少,使用异步并不能明显的提升性能。

 

//在brokerUrl中设置
tcp://localhost:61616?jms.alwaysSessionAsync=true

 

3. maxThreadPoolSize

    在alwaysSessionAsync为true的情况下,会引入一个线程池,那么这个线程池中线程的个数,默认为Integer.MAX_VALUE,不过我们首先需要清楚,这个线程池是Connection内所有的session共享的。你可以指定合适的值来决定线程池的大小。

 

//在brokerUrl中设置
tcp://localhost:61616?jms.maxThreadPoolSize=64

 

4. useDedicatedTaskRunner

    当前connetion下的每个session是否使用单独的线程。在上述介绍中,我知道了异步转发是使用了线程池的,而且这个线程池是当前Connection下所有的session共享,尽管这不会带来任何问题,可是有时候我们可能期望在threadLocal或者基于当前线程绑定一些特殊的数据,那么我们就可以使用useDedicatedTaskRunner=true;这就意味着,每个session都将创建一个单独的线程,整个生命周期中都使用它来转发消息。

 

    当每个session只有一个Consumer,或者有多个consumer但它们的处理业务类似(比如selector具有一定逻辑关系),或者这些consumer消费速度都很快,或者我们期望在Consumer中使用类似于Thread.sleep(long)这样的方式来阻塞session转发消息时,这个参数可以帮助我们。

 

//brokerUrl中,默认为false
tcp://localhost:61616?jms.useDedicatedTaskRunner=true

  

5. redeliveryPolicy

    consumer使用的重发策略,当消息在client端处理失败(比如onMessage方法抛出异常,事务回滚等),将会触发消息重发。对于Broker端,需要重发的消息将会被立即发送(如果broker端使用异步发送,且发送队列中还有其他消息,那么重发的消息可能不会被立即到达Consumer)。我们通过此Policy配置最大重发次数、重发频率等,如果你的Consumer客户端处于不良网络环境中,可以适当调整相关参数。参数列表,请参见(RedeliveryPolicy)

 

//在brokerUrl中设置
tcp://localhost:61616?jms.redeliveryPolicy.maximumRedeliveries=6

  

6. messagePrioritySupported

    Consumer端是否支持权重,默认为true,表示Consumer接受的所有消息都会按照权重排序,即权重较高的消息,优先被传递给messageListener.onMessage()方法等。如果为false,则使用FIFO队列,消息达到Consumer之后将会加入到队列,忽略权重属性,不过这在消息有重发的情况下,可能有少许影响。这个参数可以配合broker端“strictOrderDispath”达成消息权重排序。[参见Priority]

 

//在brokerUrl中设置
tcp://localhost:61616?jms.messagePrioritySupported=true

 

7. priority

    消费者权重,这个权重和消息的权重不同,它用来标记Consumer消费消息的优先级,broker端将会对优先级较高的consumer,优先转发消息(优先填充pending buffer),比如Consumer1的权重为10,Consumer2的权重为5,它们的prefetch(预获取消息的buffer尺寸)都是10,那么当Broker端有12条消息,将会优先将Consumer1的buffer填充完毕(获取10条消息),Consumer2将会获得2条消息。我们通常可以对网络良好、业务简单(比如selector更加简单)的Consumer设定较高的权重。参见ConsumerInfo类。

 

//在destinationUril中设定,默认所有的consumer权重都一样,为0
orderQueue?consumer.priority=10

 

    不过对于Topic而言,当指定了consumer的priority之后,还有一个可选的转发策略,用来优化消息传送优先级,理论上所有的Subscripter(订阅者)都会收到相同的消息,但是我们在broker端的转发时机上,让优先级较高的订阅者先得到消息,尽管这看起来似乎没有什么意义:

 

//对Queue无效
<policyEntry topic=">">  
    <dispatchPolicy>  
        <priorityDispatchPolicy />  
    </dispatchPolicy>  
</policyEntry> 

 

8. optimizeAcknowledge/optimizeAcknowledgeTimeOut

    可优化ACK策略,这个是Consumer端最重要的调优参数之一。ActiveMQ在Client端以及broker端,启用了大量的工作,用来优化消息确认(ACK),最大化消息消费效率。optimizeAcknowledge表示是否开启“优化ACK选项”,当为true时,可以指定optimizeAcknowledgeTimeOut数值用来约束ACK最大延迟确认的时间,我们通过optimizeAck,可以实现可靠的批量消息确认。具体参看[ActiveMQ消息确认机制]      

 

//brokerUrl中,默认开启,timeout为300
tcp://localhost:61616?jms.optimizeAcknowledge=true&jms.optimizeAcknowledgeTimeOut=3000

 

    如果我们的consumer足够快,且服务器端的消息足够多,且指定了合适的prefethSize,我们可以将timeout时间设置的稍微大一些。建议不要关闭此选项。

 

9. prefetchSize

    预获取消息数量,重要的调优参数之一,当Consumer活跃时,broker将会批量发送prefetchSize条消息给Consumer,consumer也可以配合optimizeAcknowledge来批量确认它们;批量传送,极大的提高了网络传输效率,此值默认为1000。通过上述,我们对broker消息转发机制的了解,可以知道,broker端将会根据consumer指定的prefetchSize来决定pendingBuffer的大小,prefetchSize越大,broker批量发送的消息就回越多,如果消费者消费速度较快,再配合optimizeAck,这将是相对完美的消息传送方案。

 

    不过,prefetchSize也会带来一定的问题,在Queue中(Topic中没有效果),broker将使用“轮询”的方式来平衡多个消费者之间的消息传送数量。如果消费者消费速度较慢,而且prefetchSize较大,这将不利于消息量在多个消费者之间平衡。通常情况下,如果consumer数量较多,或者消费速度较慢,或者消息量较少时,我们设定prefetchSize为较小的值。

 

//在destinationUri中指定,默认为1000
//也可以在Topic中使用,可以优化ACK策略
orderQueue?customer.prefetchSize=100

 

    对于Consumer而言,支持同步消费(receive)和异步侦听(messageListener),这两种方式都经常用。无论何种情况,prefetchSize都可以提供有效的优化。需要注意,如果开发者使用messageListener方式异步侦听消息,将不能设定prefetchSize <= 0的任何值。如果使用receive方式,且prefetchSize = 0时,将触发Client端使用Pull机制,即每次receive调用,都会向Broker端发送Pull指令,如果broker端有消息才会转发,在这种情况下,Broker不会主动Push消息给client。

 

10. noLocal

    consumer是否接受本地消息,所谓Local,就是和Comsumer共享Connection实例的Producer发送的消息。在此需要提醒,当客户端创建connection时都会生成一个全局唯一的connectionId,当consumer、producer创建时都会向broker发送标识信息,比如consumer创建时会向broker发送一个ConsumerInfo数据包(数据包中包含ConsumerId,唯一标记一个consumer,consumerId中即包含ConnectionId),Producer同样在创建时发送携带有ProducerId的ProducerInfo数据包;最重要的是,Producer发送消息时,所有的消息都封装了MessageId属性,这个属性中将携带ProducerId,那么我们就很方便根据这些信息判断是否为Local,如果消息被持久化,connectionId也将随之。

 

    

//默认为false,如果你通过此方式设定了nolocal属性
//那么在创建consumer时指定的值将会被忽略。
orderQueue?customer.noLocal=false

 

  • session.createConsumer(topic,null,true)

    我们通常在Topic中使用noLocal,以忽略本地的消息通知等。

 

11. exclusiveConsumer

    “排他消费者”,此参数仅对Queue有效。即如果Queue通道中,有多个consumer同时活跃时只会有一个consumer能够获取消息,对于broker而言,如果Queue中,有任意一个Consumer是“排他的”,那么消息只会转发给“exclusiveConsumer=true”的消费者;如果全部的消费者都是“排他的”,那么最新创建的consumer将会获取消息。我们通常在分布式环境中,为了避免对某些重要数据并发操作时使用此特性,比如:订单中心修改订单状态(来自各个系统的消息,都想修改订单状态,但是它们必须串行操作)。

 

//在brokerUrl中指定,默认值为false,对connectionFactory下所有的Queue有效
tcp://localhost:61616?jms.exclusiveConsumer=true

 

    我们也可以通过api的方式,为特定的Queue下的消费者设定此值,记住,只要全局中有任何一个Consumer设定了exclusive=true,都会导致整个Queue的所有的Consumer为“排他的”,直到此Consumer失效且其他Consumer中没有“排他的”时,才会解除排他性。

 

//在destinationUri中设定,只对当前Queue有效。
orderQueue?consumer.exclusive=true

 

     不过,此处还有一个小小的陷阱,如果exclusive和selector(消息选择器)同时工作时,将会是什么结果呢??比如两个Consumer,其中一个为exclusive,但是它们的selector不同,究竟两个Consumer是否都能够获取消息呢?事实上,尽管通过selector匹配消息通过,但是如果使用了exclusive,且当前的“排他消费者”不是自己,那么消息仍然不能传送到client端。但是,如果两个consumer的selector一样,那么最终将会有“排他消费者”获取消息。由此可见,selector和exclusive在某些方面具有“同样的效果”,如果所有consumer的selector都不同(且选择范围不具有交集),且使用了exclusive,将会导致一部分消息被“积压”而无法消费,我们应该避免这种情况的发生。(源码请参见: QueueDispatchSelector)

 

    此外,broker端还支持一个参数“allConsumersExclusiveByDefault”,如果此参数为true,那么当前Queue中的所有消费者默认为exclusive。

 

<!-- 只对queue有效,默认为false -->
<policyEntry queue="orderQueue" allConsumersExclusiveByDefault="true" />

 

12. retroactive

    “可追溯”消费者,只对Topic有效,如果consumer是可追溯的,那么它可以获取实例创建之前的消息。通常而言,订阅者不可能获取实例创建之前的消息,因为broker根本不知道它的存在。对于broker而言,如果一个Topic通道创建,且有发布者发布消息(Publisher),那么broker将会在内存中(非持久化)或者磁盘中(持久化)保存已经发布的消息,直到所有的订阅者都消费者,才会清除原始消息内容。那么retroactive类型的订阅者,就可以获取这些原本不属于自己但broker上还保存的旧消息,就像我们订阅一种Feed,可以立即获取旧的内容列表一样。如果此订阅者不是durable(耐久的),它可以获取最近发布的一些消息;如果是durable,它可以获取存储器中尚未删除的所有的旧消息。[下文会详细介绍Topic的数据转发模型]

 

//在destinationUrl中设置,默认为false
feedTopic?consumer.retroactive=true

 

    在broker端,可以配置当前Topic默认为“可追溯的”,不过Topic并不会在此种情况下额外的保存消息,只不过表示订阅者默认都是可追溯的而已。

 

<!-- 只对topic有效,默认为false -->
<policyEntry topic="feedTopic" alwaysRetroactive="true" />

 

 

13. selector(选择器)

    在Consumer中使用选择器可以帮助broker过滤消息,就像我们使用在RDBMS中使用where字句过滤数据一样,具体selector的写法,此处不做详细介绍。当Consumer创建之后,将会把selector信息传递给broker,此后再consumer的整个生命周期中,都将有效,不过一旦指定selector,重复设定selector将不会有效,除非关闭consumer并重建实例。

 

    在Topic和Queue使用selector的时机,有所不同。

    对于Queue而言,broker在dispatch每条消息时,都会遍历整个消费者列表,并匹配selector表达式,一旦匹配成功则将消息发送给Consumer,如果所有的selector都无法匹配,消息将沉积。注意,这些沉积的消息将会在每次pageIn时都会被加载而且也会在内存中不断叠加(直到过期),将会对Queue的转发效率带来很大的危险,如果你发现Queue的消息大量积压(undeque),你应该检测是否与selector有关。如果你使用了selector,你一定要让全局中所有的selector覆盖所有的消息,或者至少有一个没有selector的consumer。(参看源码:Queue,QueueDispatchSelector)

 

    对于Topic而言,似乎有些不同,主要是在selector应用的时机上;如果订阅者是durable(耐久的),那么订阅者的ClientId + selector信息都会被持久化保存(参见TopicMessageStore),此后即使订阅者离线,符合selector的消息,仍然会为它创建消息副本(为当前订阅者ID 与messageId的列表关系,但实际消息只有一条),如果selector不匹配,存储器将不会为此订阅者创建消息副本,也意味着当此订阅者上线后将不会感知到任何事情,就像那些消息从来都没有出现过一样;不过在broker转发消息给订阅者之前,仍然会使用selector匹配;因为订阅者可以修改selector,为了避免现有的消息副本不适合新的selector,将会在消息发送给订阅者时也使用selector匹配,对于不匹配selector的消息,此时将会按照“已消费”来处理,它们也不会发送给订阅者。此处需要提醒,创建durable类型的订阅者时,需要在connection上指定ClientId,全局中同一个Topic上所有的ClientId都不能相同,且同一个Connection上,不能创建多个duarable订阅者。

 

    如果订阅者是非durable(即为temporary),也就不存在创建消息副本的情况,那么将会在消息转发给订阅者之前检测即可。

    (源码参见:Topic,DurableTopicSubscription,TopicMessageStore,RoundRobinDispatchPolicy等)

 

14. group消息分组

    JMS规范中并没有提供消息分组的特性,不过ActiveMQ支持了此特性;Producer在发送消息时为多条消息设定group,它们将会被同一个consumer消费,这也是实现消息黏性的手段,任何时候一个group的消息只会发送给同一个Consumer,一个Consumer同时也只能消费一个group直到黏性解除。此特性只会对Queue有效。在broker有一个groupOwnerMap用来保存groupId与ConsumerId之间的关系,在消息转发时,如果消息中指定了groupId,就会使用此map检测是否已经有消费者持有了此group,如果此group的消费者是自己,那么消息会继续转发;否则,消息继续被滞留。

 

    Message中可以通过指定隐藏属性JMSXGroupID来实现:

    1) message.setStringProperty("JMSXGroupID","order_1001_group")

    2) 也可以使用activeMQMessage.setGroupID(string)。

 

    我们可以通过JMSXGroupSeq来获取当前消息在group的序号:message.getIntProperty("JMSXGroupSeq"),也可以使用activeMQMessage.setGroupSeq(int),此属性并不会决定消息在broker端排序,通常有Producer端来决定此值,仅仅是序号而已;如果此值为-1,表示此消息为group中最后一条消息,broker将会解除group与consumer的黏性关系,此后此group的消息将会被重新选择消费者,即从groupOwnerMap中移除groupId键,同时也意味着这个consumer可以切换到其他group上;如果此值为1表示,表示group重置(或者说重定向,阻断),当broker端在给某个consumer转发消息时遇到groupSeq = 1的消息,都会立即解除此此consumer与其他group的黏性,并立即与此group绑定,即将groupId与当前Consumer信息加入到groupOwnerMap中。

 

    我们通常使用-1来表示一个group中的所有序列已经发送完毕,Consumer端可以获取根据这个属性值做批量操作的提交等,例如,consumer接受了一个group中的消息,先把消息cache起来,当接收到groupSeq = -1 时,将cache中的数据批量写入文件,不过需要注意,consumer不能保证一定会收到-1的消息。groupSeq = 1是一个很特殊的值,有可能当前consumer整在接收group1的消息,此时如果group2发送了groupSeq = 1的消息,将会阻断consumer对group1的消费,转而消费group2的消息,所以consumer端在使用group时,需要对每个消息检测groupId是否发证了“重置”,而不是一味的等待groupSeq = -1。当你决定此group信息已经发送完毕时,一定要发送seq = -1;如果你决定重新开启group时,需要发送seq = 1消息。

 

    如果group中的消息有部分发送给某个consumer(此consumer可能未能完全消费group而失效了),那么剩余的消息可能被转发到其他consumer中,我们可以通过JMSXGroupFirstForConsumer(boolean 类型属性)来检测当前consumer是否为group的第一个消费者。如果使用了groupSeq = 1重置了group,那么也会强制将此属性设置为true。

 

    如果开发者的环境中,分组消息和普通消息混合使用,需要注意一些问题:consumer可能同时会接收到普通消息和分组消息,而并非同一时间只会接受分组消息,不能过度依赖group的稳定性。也期望producer端不要随意创建groupId,而是使用相对固定的groupId,结合groupSeq = -1来实现group与consumer解耦;如果producer客户端较多,有可能同时并行的group较多,我们可以通过设定一个参数来实现group在consumer的相对平稳:

 

 

//broker在转发消息之前需要等待足够多的consumer活跃或者等待超时后,才会发送消息
<policyEntry queue="orderQueue" consumersBeforeDispatchStarts="2" timeBeforeDispatchStarts="10000"/>
 

 

 

    不过似乎还有些疑惑: consumer正在消费group中的消息,如果此group中某条消息不符合consumer的selector,那么这条消息会发送到此consumer吗?会发送给其他consumer吗? 事实上,selector将会首先发挥效果,如果消息没有匹配consumer的selector,即使它的groupId与此consumer有黏性,消息也不会发送到此consumer;这些信息将会被积压,直到consumer失效,或者group被重置,那么此后这些积压的消息会转发给其他consumer;这也意味着group中的信息被“拆分”了,因为selector的问题,这些消息被“分段”后先后发给了不同的consumer。(参见Queue.doActualDispatch())

 

//在selector,exclusive,group同时使用时,消息匹配的时机
//伪代码
if(!selectorExpression.matches(message)){
    return false;
}
if(exclusiveConsumer == null || excluseSiveConsumer == currentConsumer){
	String groupId = message.getGroupId();
    if(groupId == null){
        return true;
    }else {
        groupOwner = groupOwnerMap.get(groupId);
		if(groupOwner == null){
			groupOwnerMap.put(groupId,currentConsumer);
			return true;
		}else{
			return groupOwner == currentConsumer;
		}
	}
}
return false;

 

     

15. Consumer端消息接收模型



  

    

 

16. 慢速消费者(Slow Consumer)

     慢速,是相对于producer而言;简单来说,producer不断产生新的消息,broker端在内存中已经积压的足够多(比如cacheLimit已满),但是在转发给某个consumer时,发现此consumer仍然有大量的消息尚没有消费(ACK),broker会认为此consumer是慢速的。在Queue中,如果已发送(dispatched)但没有消费(unAck)的消息条数 > prefetchSize时,此consumer被标记为Slow。在Topic中,如果cacheLimit已满,但是需要向此订阅者发送的消息量 > prefetchSize时,此订阅者被标记为Slow。简单描述为: 快速的producer生产的消息,不能被消费者及时的消费,而导致在broker端积压。

 

    通常,慢速消费者会给整个broker带来潜在的危险,积压消息可能耗尽broker内存,也可能会导致消息不断的从文件pageIn到内存然后swap到文件中(如果使用了临时文件),而消耗磁盘IO,最重要的一点是,它们还会拖累Producer,导致producer端的阻塞而降低消息生产的效率,从而牵连那些快速消费者也无法获取充足的消息。所以,我们需要根据应用需要,对慢速消费者以及可能产生的潜在危险做一些必要的容错。

     如果你发现了某个通道中有慢速消费者,你要么人为的让producer“减速”,或者增加consumer的数量。

 

    慢速消费者,在TemporaryTopic中会带来很多问题;对于TemporaryTopic而言,直到所有的订阅者都消费了消息实体,才会将消息从内存中删除(durable),有此可见,如果活跃的订阅者中,有一个是慢速的,就会导致消息在内存中积压而无法删除,当内存(cacheLimit)达到阀值时,会阻塞producer继续发送消息(Flow Control策略默认开启),此时那些快速的消费者无法继续获取新的消息,就这么一个"猪一样的队友",最终导致整个TemporaryTopic的消息流动性低效。 

 

    当Topic、Queue中非持久化消息的存储,也会遇到上述问题。如果开启了Flow Control,内存已满的情况下,则会阻塞通道中所有的Producer(阻塞主线程,producer底层的Transport将会被block)继续发送消息(waitForSpace),直到足够的message被消费且从内存中移除;如果支持sendFailIfNoSpace,则会在内存不足时,向producer发送error信息,producer可以捕获异常做重试操作(不会block底层的transport)。如果没有开启Flow Control,消息将会被直接转发(包括内存存储,不会检测内存使用情况,如果内存不足将直接阻塞),如果systemUsage内存满足条件(所有的通道共享内存)则正常,否则waitForSpace(挂起,所有producer处于假死状态),此时可能会有OOM的潜在危险。所以通常建议开启Flow Control,以及为每个通道设定合适的memoryLimit。

 

//Flow Control伪代码,参见Queue.send方法。
Message message = queue.receiveMessage();
//如果通道中memoryList已经满载
if(memoryLimit.isFull()){
    //如果开启了flow Control,则阻塞
    if(flowControll){
		waitForSpace();//block
	}
}
//无论如何,持久化消息都要存储。、
//假如磁盘足够
if(message.isPersistent()){
	store.add(message);
}
//对于非持久化消息,将直接存入在pendingCursor中。不同的pendingCursor存储机制不同

//比如在storeQueueCursor中,如果broker指定了persistent=true,则会将非持久化信息写入临时文件
//否则,将直接存入内存,并导致systemMemoryUsage计数器增加,直到full阻塞。
pendingMessageCursor.add(message);

 

 

<!-- 每个队列,内存使用上限为256M,userCache表示持久化的消息也可以缓存以提高转发效率 -->
<!-- advisoryForSlowConsumers表示将慢速消费者信息发布到advisory通道中 -->
<!-- advisoryWhenFull表示当memoryLimit慢载时,向advisory通道中发布通知。 -->
<policyEntry queue=">" producerFlowControl="true" memoryLimit="256mb" useCache="false" advisoryForSlowConsumers="true" advisoryWhenFull="true" />

<systemUsage>  
 <systemUsage sendFailIfNoSpace="true">  
   <memoryUsage>  
     <memoryUsage limit="8gb"/><!-- 所有的通道缓存消息的总内存大小,memoryLimit作为其子模块 -->  
   </memoryUsage>  
 </systemUsage>  
</systemUsage> 
 

 

    对于Topic、Queue中持久化消息,我们就无需担心过多,只要磁盘足够即可。这些消息将会在持久化成功之后,从内存中移除(useCache?),它们不会有额外的性能影响。

 

    仅仅通过限制内存和Flow Control并不能保证整体的消息吞吐能力,我们需要做点什么!

 

    1) 关闭Slow Consumer

 

 

<policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">             
  <slowConsumerStrategy>
	<abortSlowConsumerStrategy abortConnection="false"/>
  </slowConsumerStrategy>
</policyEntry>

 

    brorker端一旦发现slow consumer,就将它注册到慢速消费者列表中,此后将有额外的线程扫描并关闭它们,其中abortConnection表示,是否关闭底层的transport,默认为false,此时将会通过transport向client端发送一个指令(其中包括consumerId),当client端(Session)接收之后,将会调用consumer.close()方法;如果此值为true,将会导致底层的transport链接被关闭,这是很粗暴的办法,不过如果client端多个consumer共享一个connection的话,会导致所有的consumer被关闭,还是那句话:猪一样的队友,害了整个团队。

 

    2) 抛弃旧消息(仅对Topic有效,仅对nondurable订阅者有效)

 

 

<policyEntry topic=">" producerFlowControl="true" memoryLimit="512mb">             
	<pendingMessageLimitStrategy>
	  <!-- 对于慢速消费者,只保留最近100条未消费的消息,仅对topic有效 -->
	  <constantPendingMessageLimitStrategy limit="100"/>
	  <!-- 值保留 2.5 * prefetchSize条消息
	  <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
	  -->
	</pendingMessageLimitStrategy>
</policyEntry>

 

     pendingMessageLimitStrategy仅对Topic有效,可以保证慢速消费者不会拖累太久。不过,如果滞留的消息大于了pendingMessageLimit,我们需要使用“移除策略”来移除多余的消息。

<policyEntry topic=">" producerFlowControl="true" memoryLimit="512mb">             
	<messageEvictionStrategy>
		<oldestMessageEvictionStrategy />
	</messageEvictionStrategy>
</policyEntry>

 

    topic支持支持3种移除策略:

    (1) oldestMessageEvictionStrategy表示移除最旧的消息

    (2) uniquePropertyMessageEvictionStrategy表示移除根据属性值筛选消息并移除最旧的

    (3) OldestMessageWithLowestPriorityEvictionStrategy表示在旧消息中移除权重最低的。

 

    3) 写入临时文件

 

//如果期望非持久化数据写入文件,必须将broker的持久性设置为true,(默认值为true),即
<broker persistent="true" brokerName="server1">
    <policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb">             
	<pendingQueuePolicy>
		<storeCursor />
        </pendingQueuePolicy>
    </policyEntry>
	<persistenceAdapter>
		<!--
		<kahaDB directory="${activemq.data}/kahadb"/>
		-->
		<levelDB directory="${activemq.data}/leveldb"/>
	</persistenceAdapter>
	<!-- 临时文件存储,默认不存储任何临时文件 -->
	<tempDataStore>
		<!--
		<pListStoreImpl directory="${activemq.data}/tmp"/>
		-->
		<levelDB directory="${activemq.data}/leveldb/tmp"/>
	</tempDataStore>
</broker>

 

    对于Queue而言,支持storeCursor,vmQueueCursor , fileQueueCursor。其中storeCursor是一个“综合”策略,持久化消息使用fileQueueCurosr支持,非持久化消息使用vmQueueCursor支持。vmQueueCursor基于内存,fileQueueCursor表示将数据写入本地临时文件(由tempDataStore决定)。(具体参见:[ActiveMQ策略])

 

<policyEntry topic=">" producerFlowControl="false" memoryLimit="10mb">
	<pendingSubscriberPolicy>
		<!-- 对于非耐久的订阅者,非持久化消息: vmCursor,fileCursor -->
		 <fileCursor/>
	</pendingSubscriberPolicy>
	<pendingDurableSubscriberPolicy>
		<!-- 对于耐久的订阅者,非持久化消息 -->
		<!-- storeDurableSubscriberCursor -->
		<!-- vmDurableCursor -->
		<!-- fileDurableSubscriberCursor -->
		<storeDurableSubscriberCursor/>
	</pendingDurableSubscriberPolicy>
</policyEntry>

 

    对于Topic而言,我们也可以根据订阅者的类型,来决定如果处理那些滞留的非持久化消息。

 

    4) offlineDurableSubscriberTimeout

    对于durable订阅者,如果订阅者“离线”,那么Broker将会一直保存属于它的消息,因此消息也会以为它而不能删除,导致积压。通常,订阅者离线的时间是无法预估的,有可能此订阅者永远都不会再上线(可能因为durable订阅者本来应该cancel,但是开发者却忘记了),这对broker来说是致命的。我们需要限制durable订阅者离线的时间,如果超过时间,那么订阅者将会被移除,消息也会因此而删除。

 

<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">

 

 

17. 过期消息

    当消息到达consumer端时,会检测消息是否过期,对于过期的消息将不会传递到listener.onMessage方法或者通过receive返回,这些消息将会被直接ACK。

 

    判断一个消息是否过期的方式很简单,就是比较消息的Expiration时间戳和当前时间戳。

 

18. 重复消息(Duplicate)

    因为网络问题或者consumer在集群中迁移的问题(迁移到其他broker中)等,有可能导致consumer接收到重复的消息,所谓“重复消息”就是当前consumer已经“遇到”过的消息被再次接收到。consumer在将消息消费之前,都会检测消息是否重复,对于重复消息,将不会消费而是直接发送poisonAck(毒丸),broker端会将消息直接删除。(具体源码,请参见ConnectionAudit.isDuplicate(),使用了bitSet来标记某个消息ID是否出现过)

 

19.ActiveMQ Consumer与spring代码样例

    1) spring配置

 

<bean id="activeMQConnectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">  
    <property name="brokerURL" value="tcp://localhost:61616"/>  
    <property name="userName" value=""/>  
    <property name="password" value=""/>  
    <property name="sendTimeout" value="3000" />  
</bean>

<!-- order -->
<bean id="orderDestination" class="org.apache.activemq.command.ActiveMQQueue">
	<property name="physicalName" value="order.queue"></property>
</bean>
<bean id="orderListener" class="com.test.service.listener.OrderListener"/>
<bean id="orderConsumer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="activeMQConnectionFactory" />
	<property name="destination" ref="orderDestination" />
	<property name="messageListener" ref="orderListener" />
	<property name="concurrentConsumers" value="10" />
	<property name="maxConcurrentConsumers" value="20" />
</bean>
<!--
<bean id="orderMessageExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">  
    <property name="corePoolSize" value="2" />  
    <property name="maxPoolSize" value="5" />  
    <property name="daemon" value="true" />  
    <property name="keepAliveSeconds" value="120" />  
</bean>  
-->

 

    在上述配置中DefaultMessageListenerContainer可以指定很多其他选项,如下为主要的属性列表:

    messageListener: 消息侦听器,必选属性。

 

    taskExecutor: 任务调度器,可以使用线程池开并发的消费消息。如果开发者不指定,spring将会采用默认的TaskExecutor(SimpleAsyncTaskExecutor,类似于CachedThreadPool)。

 

    concurrentConsumers: 消费者的最大个数,因为在spring中messageListener实例是单例的,比如上文中的orderListener(备注:它实现了MessageListener接口),所以spring-jms不能自作主张的创建多个messageListener实例来并发消费。所以spring在内部,创建了多个MessageConsumer实例,并使用consumer.receive()方法以阻塞的方式来获取消息,当获取消息后,在执行messageListener.onMessage()方法;concurrentConsumers属性就是为了指定spring内部可以创建MessageConsumer的最大个数;当messageConsumer实例被创建后,将会封装在一个Runner接口并交给taskExecutor来调度;如果consumer在一直没有收到消息,则会被置为“idle”并从consumer列表中移除;如果所有的consumer都处于active状态,则会创建新的consumer实例直到达到maxConcurrentConsumers个数上限。通常taskExecutor的线程池容量稍大于concurrentConsumer。

 

    maxMessagesPerTask: 每个consumer所消费的消息个数,因为每个consumer都会独占一个Thread[consumer.receive()是阻塞的],当consumer消费maxMessagesPerTask个消息后,它就会退出线程,由taskExecutor重新调度。

 

    receiveTimeout: 内部的consumer在receive方法中阻塞的时间。默认为1秒。

 

    recoveryInterval:  当消息消费时,底层connection异常而无法继续,listener需要等待恢复的时间间隔。默认为5000ms。

 

    concurrency: “concurrentConsumers”与“maxConcurrentConsumers”两个参数的简写方式,格式为“5-10”,则表示concurrentConsumers为5,maxConcurrentConsumers为10。

 

    sessionTransacted: Session是否为事务类型。默认为false。

 

    messageSelector: 消息选择器。如果你希望此listener只接受某种特性的消息,可以通过指定selector的方式来过滤消息。

 

    pubSubDomain: 此消费通道是否为Topic,默认为“false”。所有与Topic有关的属性,只有在pubSubDomain为true的情况下生效。

 

    pubSubNoLocal: 对于Topic而言,此消费者是否消费本地消息。所谓本地消息,就是当Consumer与Producer公用底层一个Connection时,那么Producer发送的消息,相对于此Consumer就是本地消息。在pubSubDomain为true时有效。

 

    subscriptionDurable: 是否为“耐久性”订阅者。在pubSubDomain为true时有效。默认为false。

 

    durableSubscriptionName: 耐久订阅者名称,每个clientId下可以有多个耐久订阅者,但是他们必须有不同的名字。默认为className。

 

    errorHandler: 当listener.onMessage方法抛出异常时,异常该如何处理。

 

    autoStartup: 消费者是否自动启动,默认为true,那么在messageContainer实例化后,将会启动consumer(即调用Connection.start());如果为false,那些开发者需要在合适的时机手动启动。

    clientId: 对于Topic订阅者而言,此参数必备。

 

    sessionAcknowledgeMode: ACK MODE,默认为AUTO。spring-jms做了一件非常遗憾的事情,如果指定了sessionTransacted为true,那么在调用listener.onMessage()方法之后,则会立即提交事务(session.commit()),即使开发者使用了sessionAwareMessageListener,所以开发者无法实现基于事务的“批量”确认机制。如果开发者指定为CLIENT_ACK,那么spring-JMS将会在onMessage方法返回后立即调用message.acknowlege()方法,所以开发者自己是否确认以及何时确认,将没有意义,如果不希望spring来确认消息,只能在onMessage方法中通过抛出异常的方式。 其中“1”表示AUTO_ACKNOWLEDGE,“2”为CLIENT_ACKNOWLEDGE = 2,“3”为 DUPS_OK_ACKNOWLEDGE = 3。  

    

    2) OrderListener.java

 

public class OrderListener implement MessageListener {
    public void onMessage(Message message) {
        if(!(message instanceof TextMessage)){
        	return;
        }
        logger.info(">>>>FollowListener,received:[" + message + "]");
        try {
			//
		} catch(Exception e){
			throw new RuntimeException(e);
		}
	}
} 

 

 

结语:

    本人通过阅读ActiveMQ的源码,得出本博文的某些结论,或许因为能力有限或者方法不够精确,某些内容尚且不足,请各位不吝赐教,欢迎批评斧正。

 

PolicyEntry属性详解: http://activemq.apache.org/per-destination-policies.html

MessageCursor:http://activemq.apache.org/message-cursors.html

消息转发特性:http://activemq.apache.org/message-dispatching-features.html

 

Slow Consumer:http://activemq.apache.org/slow-consumer-handling.html

  • 大小: 173 KB
分享到:
评论
3 楼 Rafael520 2017-03-21  
QING____ 写道
Rafael520 写道
楼主大牛,请问下 是如何有效的阅读源码呢,个人在阅读时,容易迷失重点

源码的量还是很大的,你可以先问自己几个为什么?然后着手看源码。

感谢,看你的博文受益匪浅,内容清晰,有深度
2 楼 QING____ 2017-03-18  
Rafael520 写道
楼主大牛,请问下 是如何有效的阅读源码呢,个人在阅读时,容易迷失重点

源码的量还是很大的,你可以先问自己几个为什么?然后着手看源码。
1 楼 Rafael520 2017-02-04  
楼主大牛,请问下 是如何有效的阅读源码呢,个人在阅读时,容易迷失重点

相关推荐

    spring配置activemq详解

    在"spring配置activemq详解"这个主题中,我们将探讨如何在Spring项目中配置和使用ActiveMQ。以下是对这个主题的详细说明: 1. **配置ActiveMQ**: - 首先,我们需要在项目中引入ActiveMQ的相关依赖,这通常通过在`...

    ActiveMQ+Spring完整详解例子

    通过以上内容,开发者应该能够理解ActiveMQ的基本原理,学会如何在Spring应用中配置和使用ActiveMQ,以及如何利用其特性来优化应用程序的性能和可靠性。在实际项目中,合理地运用消息中间件如ActiveMQ,可以显著提高...

    Apache ActiveMQ Queue Topic 详解

    ### Apache ActiveMQ Queue & Topic 详解 #### 一、特性及优势 Apache ActiveMQ 是一款高性能、功能丰富的消息中间件,具有以下显著特点: 1. **实现 JMS 1.1 规范**:支持 J2EE 1.4 及以上版本,这意味着它可以...

    ActiveMQ环境搭建及实例详解的源码

    在实际开发中,你可以根据业务需求利用ActiveMQ的高级特性,构建高效、可靠的分布式系统。在提供的`TestForActiveMQ`源码中,可能包含了更复杂的示例,比如消息的持久化、事务处理或者网络集群配置,通过学习这些...

    ActiveMQ 5.7源码API

    《ActiveMQ 5.7源码API详解》 Apache ActiveMQ是开源的、高性能的、功能丰富的消息中间件,它遵循JMS(Java Message Service)规范,为分布式系统提供了可靠的消息传递服务。ActiveMQ 5.7版本是其重要的一个里程碑...

    ActiveMQ完整项目示例

    **ActiveMQ完整项目示例详解** ActiveMQ是Apache软件基金会下的一个开源消息中间件,它遵循JMS(Java消息服务)规范,用于在分布式系统中传递消息。在本项目示例中,我们将深入探讨如何使用MyEclipse 10开发基于...

    ActiveMQ快速上手 PDF

    ### ActiveMQ 快速上手知识点详解 #### 一、ActiveMQ简介 - **定义**:ActiveMQ 是 Apache 软件基金会所研发的一款开源消息中间件,它完全支持 JMS 1.1 和 J2EE 1.4 规范,能够作为 JMS Provider 实现消息传递功能...

    linux版本ActiveMQ 5.15.8

    **Linux版ActiveMQ 5.15.8详解** ActiveMQ是Apache软件基金会下的一个开源消息中间件项目,它提供了一种可靠的消息传递机制,使得应用程序可以进行异步通信。在Linux环境中部署和使用ActiveMQ,能够有效地提高系统...

    activemq 入门示例代码

    **ActiveMQ 入门示例代码详解** ActiveMQ 是 Apache 开源组织开发的一款高效、可靠的开源消息中间件,它...同时,ActiveMQ 还提供了丰富的特性,如事务、消息确认、消息分页、网络连接等,等待你在后续的学习中探索。

    activemq的简单配置

    ### ActiveMQ基础配置与应用详解 #### 一、ActiveMQ简介 ActiveMQ是Apache软件基金会提供的一个开源消息中间件项目,其主要功能是作为一个消息的接收和转发容器,用于实现消息队列服务。ActiveMQ支持两种基本的角色...

    ActiveMQ案例含文档

    **ActiveMQ 案例与文档详解** ActiveMQ 是 Apache 软件基金会的一个开源项目,它是业界广泛使用的消息中间件,遵循 JMS(Java Message Service)标准,提供高效、可靠的异步通信服务。在本案例中,我们将深入探讨 ...

    C# ActiveMQ 实例

    **C# ActiveMQ实例详解** 在信息技术领域,消息中间件起着至关重要的作用,它能够帮助应用程序之间进行异步通信,提高系统的可扩展性和可靠性。ActiveMQ是Apache软件基金会开发的一款开源消息代理,支持多种协议,...

    【BAT必备】activeMQ面试题

    ### ActiveMQ面试题详解 #### 一、ActiveMQ基础概念 **1.1 什么是ActiveMQ?** ActiveMQ是Apache出品的一款开源的消息中间件,它实现了JMS(Java消息服务)1.1和J2EE 1.4规范,完全支持JMS API,允许任何JMS...

    消息队列activeMQ

    **消息队列ActiveMQ详解** ActiveMQ是Apache软件基金会下的一个开源项目,它是一个功能强大的消息中间件,广泛应用于分布式系统中的异步处理和解耦。消息队列ActiveMQ允许应用程序通过发送和接收消息来进行通信,而...

    ActiveMQ实例Demo

    **ActiveMQ实例Demo详解** Apache ActiveMQ是一款开源的消息中间件,它是Java消息服务(JMS)的实现,广泛应用于分布式系统中的异步通信。ActiveMQ以其高性能、高可靠性以及丰富的特性,成为许多企业级应用首选的...

    activeMQ实例

    【ActiveMQ实例详解】 在IT行业中,消息队列(Message Queue,简称MQ)是一种重要的中间件,用于在分布式系统中解耦组件之间的通信,提高系统的响应速度和可扩展性。ActiveMQ是Apache软件基金会开发的一款开源消息...

    ActiveMQ例子

    **Apache ActiveMQ 详解** Apache ActiveMQ 是一款开源的消息中间件,它遵循Java Message Service (JMS) 规范,提供了高效、可靠的异步消息传递功能。ActiveMQ 的设计目标是提供灵活、高性能且易用的消息传递解决...

    ActiveMQ实例

    **ActiveMQ实例详解** ActiveMQ,作为Apache软件基金会的一个开源项目,是业界广泛使用的消息中间件,它遵循Java Message Service(JMS)标准,提供高效、可靠的异步通信能力。在分布式系统中,ActiveMQ扮演着至关...

    Spring+ActiveMQ整合实例代码工程

    **Spring与ActiveMQ整合详解** 在Java开发中,Spring框架是极为重要的应用基础,而ActiveMQ作为Apache出品的一款开源消息中间件,常被用于实现应用间的异步通信和解耦。本实例代码工程"Spring+ActiveMQ整合实例代码...

Global site tag (gtag.js) - Google Analytics