`

ActiveMQ Redelivery Policy(消息重发策略)

阅读更多

转载自 http://windows9834.blog.163.com/blog/static/2734500420131195224532/

 

官方文档:http://activemq.apache.org/redelivery-policy.html
在事务控制里抛出异常,txManager会进行rollback处理.(在activeMQ里,消息默认会redelivery到客户端6次,如果继续异常,消息会放到deadletter queue里(ActiveMQ.DLQ))。

在以下三种情况中,消息会被再次传送给消费者:

1.在使用事务的Session中,调用rollback()方法;

2.在使用事务的Session中,调用commit()方法之前就关闭了Session;

3.在Session中使用CLIENT_ACKNOWLEDGE签收模式,并且调用了recover()方法。

可以通过设置ActiveMQConnectionFactory和ActiveMQConnection来定制想要的再次传送策略。

当消息被再次投递到客户端时的详细信息在 Message Redelivery and DLQ Handling 章节能找到详细的说明。
你可以在ActiveMQConnectionFactory 或ActiveMQConnection类中配置RedeliveryPolicy属性,用来定义重传策略的具体细节。
您可以使用Java代码,Spring配置或配置URI字符串来定义重传策略。

可用的属性

 属性 默认值 说明 
 collisionAvoidanceFactor  0.15  设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。
 maximumRedeliveries  6  最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
 maximumRedeliveryDelay  -1  最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
 initialRedeliveryDelay  1000L  初始重发延迟时间
 redeliveryDelay  1000L  重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)
 useCollisionAvoidance  false  启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。
 useExponentialBackOff  false  启用指数倍数递增的方式增加延迟时间。
 backOffMultiplier  5  重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。


官方文档有些参数功能说得不是很清楚,如collisionAvoidanceFactor,那只能查看源代码了,
可以查看org.apache.activemq.ActiveMQMessageConsumer类中的rollback()方法:

publicvoid rollback()throwsJMSException{

......

finalint currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
if(currentRedeliveryCount >0){
redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
}else{
redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
}

......

}

先获取重发次数,如果是第一次重发则延迟时间为initialRedeliveryDelay,如果不是第一次重发则延迟时间为redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);这里的redeliveryDelay参数是前一次重发的延迟时间。

publiclong getNextRedeliveryDelay(long previousDelay){
long nextDelay;
if(previousDelay ==0){
nextDelay = redeliveryDelay;
}elseif(useExponentialBackOff && backOffMultiplier >1){
nextDelay =(long)(previousDelay * backOffMultiplier);
if(maximumRedeliveryDelay !=-1&& nextDelay > maximumRedeliveryDelay){
// in case the user made max redelivery delay less than redelivery

delay for some reason.
nextDelay =Math.max(maximumRedeliveryDelay, redeliveryDelay);
}
}else{
nextDelay = previousDelay;
}
if(useCollisionAvoidance){
/*
* First random determines +/-, second random determines how far to
* go in that direction. -cgs
*/
Random random = getRandomNumberGenerator();
double variance =(random.nextBoolean()? collisionAvoidanceFactor :

-collisionAvoidanceFactor)* random.nextDouble();
nextDelay += nextDelay * variance;
}
return nextDelay;
}

计算下一次重发延迟时间时,当启用指数倍数增长方式useExponentialBackOff=true,且增长倍数backOffMultiplier > 1时,下一次延迟时间为previousDelay * backOffMultiplier,这里的previousDelay为前一次延迟重发的时间。

collisionAvoidanceFactor这个参数spring中是不能直接注入的,注入方法为:

publicvoid setCollisionAvoidancePercent(short collisionAvoidancePercent){
this.collisionAvoidanceFactor = collisionAvoidancePercent *0.01d;
}

这里的collisionAvoidancePercent是个短整形,比如15,则方法直接计算为15%。在spring中注入方式为:

<propertyname="collisionAvoidancePercent"value="15"/>

如果useCollisionAvoidance=true,则nextDelay += nextDelay * variance;这里的variance是一个随机的正负值,Random.nextDouble()的范围是:0.0d到1.0d,当redeliveryDelay=1000L,collisionAvoidanceFactor值为0.15时,那么正负variance的范围就是:-0.15到+0.15,那么最后nextDelay的值范围就是:1000+1000*-15%~1000+1000*15%,即在850~1150范围内波动。

分享到:
评论

相关推荐

    springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制

    ActiveMQ提供了一种基于Redelivery Policy的重试策略。你可以在ActiveMQ的配置中定义重试次数、间隔时间等参数。在代码中,如果消息处理失败,可以抛出异常,ActiveMQ会根据重试策略进行消息的重新发送。 ```xml &lt;!...

    springboot整合activemq 消费者 ACK手动确认 &消息重发

    springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。 里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ...

    ActiveMQ消息过期时间设置和自动清除解决方案

    - [ActiveMQ官方文档 - 消息重试和DLQ处理](http://activemq.apache.org/message-redelivery-and-dlq-handling.html) #### 6. 当前方案 根据上述配置和测试步骤,当前采用的方案如下: 1. **消息设置过期**:通过...

    activeMQ发送消息返回消息

    ActiveMQ是Apache软件基金会开发的一款开源消息中间件,它基于Java消息服务(JMS)规范,用于在分布式系统中提供高效、可靠的消息传递。在ActiveMQ中,发送和接收消息是一个核心功能,它允许应用程序之间进行异步...

    ActiveMQ RabbitMQ RokcetMQ 消息队列中间件视频教程

    作为消息中间件的MQ在java开发中起着举足轻重的地位,无论是ActiveMQ、RabbitMQ、还是RokcetMQ至少要会一个,否则别说自己是java程序员。Java自学网整理了目前行业最常用的消息中间件视频供大家学习。

    用C#实现的ActiveMQ发布/订阅消息传送

    在本场景中,我们关注的是如何使用C#编程语言结合ActiveMQ来实现发布/订阅模式的消息传送。ActiveMQ是Apache软件基金会开发的一个开源消息传递平台,支持多种协议,包括NMS(.NET Messaging Service),它是专门为...

    SpringBoot+ActiveMq+MQTT实现消息的发送和接收

    在本文中,我们将深入探讨如何使用SpringBoot、ActiveMQ和MQTT来实现消息的发送与接收。这是一个典型的分布式系统中的消息通信场景,其中SpringBoot作为应用程序框架,ActiveMQ作为消息中间件,而MQTT(Message ...

    Springboot整合ActiveMQ,实现消息的发送接收功能源码

    在本文中,我们将深入探讨如何使用SpringBoot框架与Apache ActiveMQ集成,以便...随着你对ActiveMQ和SpringBoot的进一步理解,可以探索更高级的特性,如事务消息、消息确认和消息路由策略,以优化你的消息处理系统。

    ActiveMQ实现的消息收发案例

    ActiveMQ是Apache软件基金会开发的一款开源消息代理,它实现了Java消息服务(JMS,Java Message Service),为分布式系统提供可靠且高性能的消息传递功能。本案例将探讨如何使用ActiveMQ实现简单消息的发送与接收。 ...

    ActiveMQ学习笔记之九--发送消息到队列中

    在IT行业中,Apache ActiveMQ是一个广泛使用的开源消息代理和队列服务器,它是Java Message Service (JMS) 的实现,能够处理各种消息传递模式,包括点对点和发布/订阅。这篇"ActiveMQ学习笔记之九--发送消息到队列中...

    Spring+ActiveMQ消息队列+前台接收消息

    在本教程中,我们将探讨如何整合Spring框架与ActiveMQ消息队列,实现前后台的消息传递。这有助于提升系统的可扩展性和响应速度,降低不同组件之间的耦合度。 首先,Spring框架是Java企业级应用开发的事实标准,它...

    springboot集成activemq实现消息接收demo

    而ActiveMQ是Apache出品的一款开源消息中间件,它遵循JMS(Java Message Service)规范,用于处理应用程序之间的异步通信。本教程将详细介绍如何在Spring Boot项目中集成ActiveMQ,实现消息接收的Demo。 首先,我们...

    ActiveMQ(包括消息生成端和andorid消息接受端)

    同时,对于Android端,还需要考虑电池效率和网络条件下的消息同步策略。 总之,这个项目展示了如何在两端(消息生成端和Android消息接受端)实现ActiveMQ的完整工作流程,包括点对点通信、广播和离线推送。开发者...

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    ActiveMQ路由配置方式

    ActiveMQ路由配置方式 ActiveMQ路由配置是Apache ActiveMQ项目中的一种重要配置方式,它依赖另一个Apache项目Camel。ActiveMQ集成了Camel,启动时同时会启动Camel。通过Camel Web Console可以进行Routing配置。 ...

    7道消息队列ActiveMQ面试题!

    ActiveMQ是一款非常流行的开源消息队列中间件,它实现了JMS(Java Message Service,Java消息服务)1.1规范,面向消息的中间件(Message Oriented Middleware,MOM)是指利用高效可靠的消息传递机制进行与平台无关的...

    activemq消息持久化所需Jar包

    3. **数据库连接驱动**:ActiveMQ支持多种持久化策略,其中一种是使用关系型数据库(如MySQL、PostgreSQL)来存储消息。因此,根据你选择的数据库,需要引入相应的数据库驱动Jar包,例如`mysql-connector-java.jar`...

    使用WebSocket协议接收ActiveMQ消息

    另外,如果WebSocket连接意外断开,客户端需要有一个重连策略,以恢复与ActiveMQ的通信。 至于压缩包文件"activemq_ws_接收消息",其中可能包含示例代码、配置文件或者文档,帮助用户更好地理解如何在ActiveMQ中...

    ActiveMQ消息服务器 v6.0.1.zip

    3. 路由和过滤:ActiveMQ提供丰富的消息路由和过滤选项,如主题(Topics)、队列(Queues)、虚拟主题(Virtual Topics)等,可以根据业务需求定制消息的分发策略。 4. 事务和持久化:ActiveMQ支持事务性消息处理,...

Global site tag (gtag.js) - Google Analytics