ActiveMQ中提供了众多的“策略”(policy),它们可以在broker端为每个通道“定制”消息的管理方式。本文将简单描述主要的几种Policy。
一. DispatchPolcicy: 转发策略(Topic)
此策略表明broker端消息转发给多个Consumer时,消息被发送的顺序性,这个顺序通常指Consumer的顺序,只对Topic有效,它有3种常用的类型:
1) RoundRobinDispatchPolicy: “轮询”,消息将依次发送给每个“订阅者”。“订阅者”列表默认按照订阅的先后顺序排列,在转发消息时,对于匹配消息的第一个订阅者,将会被移动到“订阅者”列表的尾部,这也意味着“下一条”消息,将会较晚的转发给它。
2) StrictOrderDispatchPolicy: 严格有序,消息依次发送给每个订阅者,按照“订阅者”订阅的时间先后。它和RoundRobin最大的区别是,没有移动“订阅者”顺序的操作。
3) PriorityDispatchPolicy: 基于“property”权重对“订阅者”排序。它要求开发者首先需要对每个订阅者指定priority,默认每个consumer的权重都一样。
4) SimpleDispatchPolicy: 默认值,按照当前“订阅者”列表的顺序。其中PriorityDispatchPolicy是其子类。
"轮询"是比较常用的策略。
<policyEntry topic=">"> <dispatchPolicy> <roundRobinDispatchPolicy/> </dispatchPolicy> </policyEntry>
二.SubscriptionRecoveryPolicy: 恢复策略(Topic)
在非耐久订阅者“失效”期间或一个新的Topic,broker可以保留的可追溯的消息量。前提是Topic必须是“retroactive”,我们可以在distination地址中指定此属性,例如:"order.topic?consumer.retroactive=true"。默认情况下,订阅者只能获取“订阅”开始之后的消息,如果Retroactive=true,那么订阅者就可以获取其创建之前的消息列表。此Policy就是用来控制“retroactive”的消息量。
1) FixedSizedSubscriptionRecoveryPolicy: 保存一定size的消息,broker将为此Topic开辟定额的RAM用来保存最新的消息。
<!-- 1K --> <fixedSizedSubscriptionRecoveryPolicy maximumSize="1024"/>
2) FixedCountSubscriptionRecoveryPolicy: 保存一定条数的消息。
<!-- 100条 --> <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/>
3) LastImageSubscriptionRecoveryPolicy: 只保留最新的一条数据
4) QueryBasedSubscriptionRecoveryPolicy: 符合置顶selector的消息都将被保存,具体能够“恢复”多少消息,由底层存储机制决定;比如对于非持久化消息,只要内存中还存在,则都可以恢复。
5) TimedSubscriptionRecoveryPolicy: 保留最近一段时间的消息。
<!-- 可追溯最近1分钟的消息--> <timedSubscriptionRecoveryPolicy recoverDuration="60000" />
6) NoSubscriptionRecoveryPolicy: 关闭“恢复机制”。默认值。
<policyEntry topic=">"> <subscriptionRecoveryPolicy> <fixedCountSubscriptionRecoveryPolicy maximumSize="100"/> </subscriptionRecoveryPolicy> </policyEntry
三. DeadLetterStrategy: “死信”策略
Broker将如何管理“死信”。当消息过期后,或者“重发”几次之后仍然不能被正常消息,那么此消息将会被移除到DeadLetter队列中。此后,我们可以通过侦听死信队列,来获取相关通知或者对消息做额外的操作。
1) IndividualDeadLetterStrategy: 把DeadLetter放入各自的死信通道中,对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”,Topic为“ActiveMQ.DLQ.Topic.”;比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order”。我们使用“queuePrefix”“topicPrefix”来指定上述前缀。
默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue;不过开发者也可以指定为Topic。
<policyEntry queue="order"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false" /> </deadLetterStrategy> </policyEntry>
上述将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。individualDeadLetterStrategy还有一个属性为“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。
2) SharedDeadLetterStrategy: 将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ broker端默认的策略。共享队列默认为“ActiveMQ.DLQ”,可以通过“deadLetterQueue”属性来设定。
<deadLetterStrategy> <sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/> </deadLetterStrategy>
3) DiscardingDeadLetterStrategy: broker将直接抛弃DeadLeatter。如果开发者不需要关心DeadLetter,可以使用此策略。AcitveMQ提供了一个便捷的插件:DiscardingDLQBrokerPlugin,来抛弃DeadLetter。
<broker> <plugins> <discardingDLQBrokerPlugin dropAll="true" dropTemporaryTopics="true" dropTemporaryQueues="true" /> </plugins> </broker>
对于上述三种策略,还有2个很重要的可选参数,“processExpired”表示是否将过期消息放入死信队列,默认为true;“processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。
四. PendingMessageLimitStrategy: 消息限制策略(面向Slow Consumer)
此策略只对Topic有效,只对nondurable订阅者有效,当通道中有大量的消息积压时,broker可以保留的消息量。为了防止Topic中有慢速消费者,导致整个通道消息积压。(对于Topic而言,一条消息只有所有的订阅者都消费才会被删除)
1) ConstantPendingMessageLimitStrategy: 保留固定条数的消息,如果消息量超过limit,将使用“MessageEvictionStrategy”移除消息(参见下文)。
<policyEntry topic="PRICES.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="50"/> </pendingMessageLimitStrategy> </policyEntry>
2) PrefetchRatePendingMessageLimitStrategy: 保留prefetchSize倍数条消息。
<!-- 如果prefetchSize为100,则保留2.5 * 100条消息 --> <prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
五. MessageEvictionStrategy: 消息剔除策略(面向Slow Consumer)
配合PendingMessageLimitStrategy,只对Topic有效,只对nondurable订阅者有效。当PendingMessage的数量超过限制时,broker该如何剔除多余的消息。当Topic接收到信息消息后,会将消息“Copy”给每个订阅者,在保存这个消息时(保存策略参见【八】),将会检测pendingMessages的数量是否超过限制(由【四】来检测),如果超过限制,将会在pendingMessages中使用MessageEvicationStrategy移除多余的消息,此后将新消息保存在PendingMessages中。
1) OldestMessageEvictionStrategy: 移除旧消息,默认策略。
2) OldestMessageWithLowestPriorityEvictionStrategy: 旧数据中权重较低的消息,将会被移除。(message.getPriority())
3) UniquePropertyMessageEvictionStrategy: 移除具有指定property的旧消息。开发者可以指定property的名称,从此属性值相同的消息列表中移除较旧的(根据消息的创建时间)。
<policyEntry topic="SKU.PRICE.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="10000"/> </pendingMessageLimitStrategy> <messageEvictionStrategy> <uniquePropertyMessageEvictionStrategy propertyName="SKU" /> </messageEvictionStrategy> </policyEntry>
上述配置,针对SKU.PRICE通道中,只保留10000个最新的消息,当容量达到阀值时,将SKU值相同的消息列表中较旧的消息移除(只保留最新的一条消息)。比如在每条消息中,都封装一个SKU(商品ID)以及其最新价格,那么通过这种策略,值保留相同SKU的最新的一个消息。
六. SlowConsumerStrategy: 慢速消费者策略
Broker将如何处理慢消费者。Broker将会启动一个后台线程用来检测所有的慢速消费者,并定期关闭关闭它们。
1) AbortSlowConsumerStrategy: 中断慢速消费者,慢速消费将会被关闭。
<slowConsumerStrategy> <abortSlowConsumerStrategy abortConnection="false"/><!-- 不关闭底层链接 --> </slowConsumerStrategy>
2) AbortSlowConsumerStrategy: 如果慢速消费者最后一个ACK距离现在的时间间隔超过阀值,则中断慢速消费者。
<slowConsumerStrategy> <abortSlowConsumerStrategy maxTimeSinceLastAck="30000"/><!-- 30秒滞后 --> </slowConsumerStrategy>
七. PendingQueueMessageStoragePolicy: 待消费消息策略
当通道中有大量Slow Consumer时,Broker该如何优化消息的转发,以及在此情况下,“非持久化”消息达到内存限制时该如何处理。
当Broker接受到消息后,通常将最新的消息写入内存以提高消息转发的效率,提高消息ACK的效率,减少对对底层Store的操作;如果Consumer非常快速,那么消息将会立即转发给Consumer,不需要额外的操作;但当遇到Slow Consumer时,情况似乎并没有那么美好。
持久化消息,通常为:写入Store-->线程轮询,从Store中pageIn数据到PendingStorage-->转发给Consumer-->从PendingStorage中移除-->消息ACK后从Store中移除。
对于非持久化数据,通常为:写入内存-->如果内存足够,则PendingStorage直接以内存中的消息转发-->如果内存不足,则将内存中的消息swap到临时文件中-->从临时文件中pageIn到内存,转发给Consumer。
AcitveMQ提供了几个的Cursor机制,它就是用来保存Pending Messages。
1) vmQueueCursor: 将待转发消息保存在额外的内存(JVM linkeList)的存储结构中。是“非持久化消息”的默认设置,如果Broker不支持Persistent,它是任何类型消息的默认设置。有OOM风险。
2) fileQueueCursor: 将消息保存到临时文件中。文件存储方式有broker的tempDataStore属性决定。是“持久化消息”的默认设置。
3) storeCursor: “综合”设置,对于非持久化消息,将采用vmQueueCursor存储,对于持久化消息采用fileQueueCursor。这是强烈推荐的策略,也是效率最好的策略。
<!-- persistent为true时,才能将“溢出”的非持久化消息保存为临时文件 --> <broker persistent="true"> <persistenceAdapter> <!-- <kahaDB directory="${activemq.data}/kahadb"/> --> <levelDB directory="${activemq.data}/leveldb"/> </persistenceAdapter> <!-- 临时文件存储,默认不存储任何临时文件 --> <tempDataStore> <!-- <pListStoreImpl directory="${activemq.data}/tmp"/> --> <levelDB directory="${activemq.data}/leveldb/tmp"/> </tempDataStore> <!-- 内存限制为512M,如果超过阀值,则转存 --> <policyEntry queue=">" producerFlowControl="true" memoryLimit="512mb"> <pendingQueuePolicy> <storeCursor> <nonPersistent> <fileQueueCursor/> </nonPersistent> </storeCursor> </pendingQueuePolicy> </policyEntry> <systemUsage> <systemUsage sendFailIfNoSpace="true"> <memoryUsage> <!-- 所有的通道缓存消息的总内存大小,memoryLimit作为其子模块 --> <memoryUsage limit="6gb"/> </memoryUsage> </systemUsage> </systemUsage> </broker>
八. PendingSubscriberMessageStoragePolicy:(Topic)
针对“非耐久”订阅者。概念和(七)一样,支持三种策略:storeCursor, vmCursor和fileCursor。
九. PendingDurableSubscriberMessageStoragePolicy: (Topic)
针对“耐久”订阅者,支持三种策略:storeDurableSubscriberCursor, vmDurableCursor和 fileDurableSubscriberCursor。
<policyEntry topic=">" producerFlowControl="false" memoryLimit="32mb"> <pendingSubscriberPolicy> <!-- 对于非耐久的订阅者,非持久化消息: vmCursor,fileCursor --> <fileCursor/> </pendingSubscriberPolicy> <pendingDurableSubscriberPolicy> <!-- 对于耐久的订阅者,非持久化消息 --> <!-- storeDurableSubscriberCursor --> <!-- vmDurableCursor --> <!-- fileDurableSubscriberCursor --> <storeDurableSubscriberCursor/> </pendingDurableSubscriberPolicy> </policyEntry>
十. ForcePersistencyModeBrokerPlugin: 消息传输模式强制转换插件
将broker接收的消息强制转换成"PERSISTENT"或者"NOT_PERSISTENT"。
<broker> <plugins> <!-- 将所有消息的传输模式,修改为"PERSISTENT" --> <forcePersistencyModeBrokerPlugin persistenceFlag="true"/> </plugins> </broker>
相关推荐
为了确保消息处理的高效性,ActiveMQ提供了慢消费者策略来处理那些处理消息速度过慢的消费者。该策略可以通过定期检查所有慢速消费者并在达到一定阈值时中断它们,以提高系统的整体性能。 ##### 配置示例 ```xml ...
ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ...
另外,如果WebSocket连接意外断开,客户端需要有一个重连策略,以恢复与ActiveMQ的通信。 至于压缩包文件"activemq_ws_接收消息",其中可能包含示例代码、配置文件或者文档,帮助用户更好地理解如何在ActiveMQ中...
ActiveMQ中的生产者(Producer)发送消息到队列或主题(Topic),而消费者(Consumer)则从这些队列或主题中接收消息。队列遵循FIFO(先进先出)原则,每个消息只能被一个消费者接收;主题则支持多播,多个订阅者...
3. 路由和过滤:ActiveMQ提供丰富的消息路由和过滤选项,如主题(Topics)、队列(Queues)、虚拟主题(Virtual Topics)等,可以根据业务需求定制消息的分发策略。 4. 事务和持久化:ActiveMQ支持事务性消息处理,...
3. **数据库连接驱动**:ActiveMQ支持多种持久化策略,其中一种是使用关系型数据库(如MySQL、PostgreSQL)来存储消息。因此,根据你选择的数据库,需要引入相应的数据库驱动Jar包,例如`mysql-connector-java.jar`...
通过理解服务器宕机时的数据存储策略,处理丢消息的策略,优化持久化消息的发送,调整prefetch机制以确保消息公平分配,以及利用死信队列来处理异常情况,我们可以更好地利用ActiveMQ构建健壮的分布式系统。
- **XML配置**:ActiveMQ的核心配置文件位于`conf/activemq.xml`中,这里可以设置各种参数,比如Broker的名称、监听端口、持久化策略等。 - **Java配置**:除了传统的XML配置外,还可以通过Java代码的方式进行配置。...
3. **高可用性**:通过集群和复制策略,ActiveMQ可以实现故障转移和负载均衡,保证服务的连续性。 4. **消息优先级与时间戳**:消息可以根据优先级进行处理,同时,系统会根据时间戳自动清理过期消息,以保持高效...
7. **安全性与网络配置**:介绍如何设置用户权限,保护ActiveMQ服务器的安全,以及网络配置,如集群和多数据中心的部署策略。 8. **监控与管理**:讲解如何使用ActiveMQ的Web控制台进行实时监控,以及通过JMX进行...
.NET 封装的ActiveMQ消息队列是一种在.NET环境中对Apache ActiveMQ消息中间件进行抽象和简化的方法,以便开发者能够更方便地在.NET应用中使用ActiveMQ的功能。ActiveMQ是业界广泛使用的开源消息代理,它遵循Java消息...
Apache ActiveMQ是一个开源的消息中间件,它属于Apache软件基金会。...通过这些知识点,我们可以更好地理解ActiveMQ的工作原理及其在不同场景下的应用和配置策略,为进行消息中间件面试或实际应用提供有价值的参考。
9. **插件体系**:ActiveMQ拥有丰富的插件体系,用户可以根据需求自定义消息过滤、路由策略等,增强了其灵活性和可扩展性。 对于计算机案例和模板建站,ActiveMQ可以作为后台服务,提供异步通信能力,处理大量并发...
### ActiveMQ消息服务配置详解 #### 一、ActiveMQ配置概览 ActiveMQ是一款非常流行的开源消息中间件,它基于Java开发,支持多种消息传递模式,如点对点(P2P)、发布/订阅(Pub/Sub)等。本文将详细介绍ActiveMQ的配置...
另外,可以使用分片(Sharding)策略来分散消息。 10. **如何监控 ActiveMQ 的运行状态?** ActiveMQ 提供了管理控制台和 JMX(Java Management Extensions)接口,可以监控 Broker 的运行情况,如消息队列长度、...
在代码中,如果消息处理失败,可以抛出异常,ActiveMQ会根据重试策略进行消息的重新发送。 ```xml <!-- activemq.xml配置文件 --> ``` 此外,你还可以通过监听消息的Acknowledgement来控制重发,例如,在消费...
ActiveMQ是中国开源软件Apache下的一个项目,它是Java消息服务(JMS)的实现,作为一个高效、灵活且可扩展的消息中间件,它在分布式系统中扮演着至关重要的角色。本资料包"ActiveMQ消息中间件.zip"包含了关于...
**ActiveMQ**是Apache软件基金会开发的一款开源消息代理,它实现了JMS规范,为应用程序提供了一个中间件,允许应用程序之间进行异步的消息通信。ActiveMQ支持多种协议,如OpenWire、AMQP、STOMP、MQTT等,适用于多种...