`
manzhizhen
  • 浏览: 293340 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论
阅读更多

       本文以ActiveMQ最新的5.10版本为准。

       大家知道,JMS规范中,Message消息头接口中有setJMSRedelivered(boolean redelivered)和getJMSRedelivered()方法,用于设置和获取消息的重发标志,当然set方法主要是MOM来调用的,我们客户端使用的是get方法。

       还记得当时阿里的电话面试曾问过我,你知道ActiveMQ中的消息重发时间间隔和重发次数吗?我当时尴尬了,只知道会重发,还真没去了解过其中的细节,所以最后被完美的“淘汰了”。

       后来有时间了就去网上看了下官方的文档,所以现在把ActiveMQ中的重发机制和大家一起分享一下。

       首先,我们得大概了解下,在哪些情况下,ActiveMQ服务器会将消息重发给消费者,这里为简单起见,假定采用的消息发送模式为队列(即消息发送者和消息接收者)。

       1.如果消息接收者在处理完一条消息的处理过程后没有对MOM进行应答,则该消息将由MOM重发。需要注意的是,如果采用非事务持久化消息加Session.CLIENT_ACKNOWLEDGE应答模式,当消费者在处理完消息后没有主动调用Message#acknowledge()方法时,MOM不会主动重发,如果这时候MOM宕机了,当重启MOM后,将消费者机器也重启后MOM才会重发消息,但此时的消息不会有重发标记,因为MOM都不记得自己有宕机过,也不知道这些消息被发送过。

       2.如果我们队某个队列设置了预读参数(consumer.prefetchSize),如果消息接收者在处理第一条消息时(没向MOM发送消息接收确认)就宕机了,则预读数量的所有消息都将被重发。

       3.如果Session是事务的,则只要消息接收者有一条消息没有确认,或发送消息期间MOM或客户端某一方突然宕机了,则该事务范围中的所有消息MOM都将重发。

       说到这里,大家可能会有疑问,ActiveMQ消息服务器怎么知道消费者客户端到底是消息正在处理中还没来得急对消息进行应答还是已经处理完成了没有应答或是宕机了根本没机会应答呢?其实在所有的客户端机器上,内存中都运行着一套客户端的ActiveMQ环境,该环境负责缓存发来的消息,负责维持着和ActiveMQ服务器的消息通讯,负责失效转移(fail-over)等,所有的判断和处理都是由这套客户端环境来完成的。

       我们可以来对ActiveMQ的重发策略(Redelivery Policy)来进行自定义配置,其中的配置参数主要有以下几个:

       a) collisionAvoidanceFactor :碰撞躲避因数,默认值是0.15,这个参数是为了躲避高并发的重发带来的问题,我们查看org.apache.activemq.RedeliveryPolicy类的源代码,   

     // +/-15% for a 30% spread -cgs
    protected double collisionAvoidanceFactor = 0.15d;
    protected long initialRedeliveryDelay = 1000L;

可以发现,该默认值带来的变动范围是正负百分之15,也就是有30%的范围,也就是说,如果延迟发送时间(也就是initialRedeliveryDelay 默认值)是1000毫秒,则该条消息第一次有可能被拖延850毫秒到1150毫秒之间后被发送,如果有第二次重发,基数就不是1000毫秒了,而是以上一次重发拖延时间为基础来算。源代码如下:

    public long getNextRedeliveryDelay(long previousDelay) {
        long nextDelay = redeliveryDelay;

        if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
            nextDelay = (long) (previousDelay * backOffMultiplier);
            if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
                // in case the user made max redelivery delay less than redelivery delay for some reason.
                nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
            }
        }

        if (useCollisionAvoidance) {
            /*
             * First random determines +/-, second random determines how far to
             * go in that direction. -cgs
             */
            Random random = getRandomNumberGenerator();
            double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
            nextDelay += nextDelay * variance;
        }

        return nextDelay;
    }

       b)maximumRedeliveries :最大重发次数,默认值是6,如果你想不限次数重发,可以设置成-1。同样是org.apache.activemq.RedeliveryPolicy类中的代码:

    public static final int NO_MAXIMUM_REDELIVERIES = -1;
    public static final int DEFAULT_MAXIMUM_REDELIVERIES = 6;

    protected int maximumRedeliveries = DEFAULT_MAXIMUM_REDELIVERIES;

我们探究一下maximumRedeliveries 的get方法,可以发现有org.apache.activemq.ActiveMQSession和org.apache.activemq.ActiveMQMessageConsumer两个类中有用到:

 其中ActiveMQSession中的代码如下:

           // Figure out how long we should wait to resend this message.
           long redeliveryDelay = redeliveryPolicy.getInitialRedeliveryDelay();
           for (int i = 0; i < redeliveryCounter; i++) {

                    // 每次重发拖延时间都是以上一次重发拖延时间来算,所以这里for循环来取得最新的拖延时间
                    redeliveryDelay = redeliveryPolicy.getNextRedeliveryDelay(redeliveryDelay);
            }

            // 交给定时任务重发
            connection.getScheduler().executeAfterDelay(new Runnable() {

                     public void run() {
                              ((ActiveMQDispatcher)md.getConsumer()).dispatch(md);
                     }
             }, redeliveryDelay);

 ActiveMQMessageConsumer中的代码类似。

 

       c)maximumRedeliveryDelay :重发最大拖延时间,默认为-1,表示没有最大拖延时间,此参数只有当useExponentialBackOff 为true时起效。同样是RedeliveryPolicy中的代码:

    protected long maximumRedeliveryDelay = -1;

    public long getNextRedeliveryDelay(long previousDelay) {
        long nextDelay = redeliveryDelay;

        if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
            nextDelay = (long) (previousDelay * backOffMultiplier);
            if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
                // in case the user made max redelivery delay less than redelivery delay for some reason.
                nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
            }
        }

        。。。。。   

     }

看源代码就显而易见了。

 

      d)initialRedeliveryDelay :第一次重发的拖延时间基础,默认是1000,单位为毫秒,前面讲collisionAvoidanceFactor 属性时已经提到过,这里不再多说。

    

      e)redeliveryDelay :如果initialRedeliveryDelay 为0,则使用redeliveryDelay ,默认也是1000。RedeliveryPolicy中源代码如下:

    protected long initialRedeliveryDelay = 1000L;
    protected long redeliveryDelay = initialRedeliveryDelay;

 

      f)useCollisionAvoidance :消息重发时是否采用前面提到的碰撞避免collisionAvoidanceFactor 参数,默认是false,不采用。源代码上面也给出了,这里不再多说。

 

      g)useCollisionAvoidance :是否使用成倍增加拖延,默认为false,如果我们希望重发的拖延时间一次比一次大很多,则可以设置它为true。上面已经给出过源代码,这里再次给出:

    protected boolean useExponentialBackOff;
    protected double backOffMultiplier = 5.0;

    public long getNextRedeliveryDelay(long previousDelay) {
        long nextDelay = redeliveryDelay;

        if (previousDelay > 0 && useExponentialBackOff && backOffMultiplier > 1) {
            nextDelay = (long) (previousDelay * backOffMultiplier);
            if(maximumRedeliveryDelay != -1 && nextDelay > maximumRedeliveryDelay) {
                // in case the user made max redelivery delay less than redelivery delay for some reason.
                nextDelay = Math.max(maximumRedeliveryDelay, redeliveryDelay);
            }
        }

        if (useCollisionAvoidance) {
            /*
             * First random determines +/-, second random determines how far to
             * go in that direction. -cgs
             */
            Random random = getRandomNumberGenerator();
            double variance = (random.nextBoolean() ? collisionAvoidanceFactor : -collisionAvoidanceFactor) * random.nextDouble();
            nextDelay += nextDelay * variance;
        }

        return nextDelay;
    }
可以看出,成倍拖延是将上一次拖延时间乘以backOffMultiplier来实现的,而 backOffMultiplier默认为5.

 

        h)backOffMultiplier :成倍拖延时间的倍率,默认为5,上面已经提到了,这里不再多说。

 

        那么接下来我们讨论下该如何配置上面所说的几项,我们可以通过Java代码,也就是JMS API来配置,也可以通过Spring来配置,当然也可以通过连接器的URL来配置:

        如果直接使用JMS API来使用ActiveMQ,我们可以如下配置(代码来自ActiveMQ的官方说明):

 ActiveMQConnection connection ...  // Create a connection
 RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
 queuePolicy.setInitialRedeliveryDelay(0);
 queuePolicy.setRedeliveryDelay(1000);
 queuePolicy.setUseExponentialBackOff(false);
 queuePolicy.setMaximumRedeliveries(2);
 
 RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
 topicPolicy.setInitialRedeliveryDelay(0);
 topicPolicy.setRedeliveryDelay(1000);
 topicPolicy.setUseExponentialBackOff(false);
 topicPolicy.setMaximumRedeliveries(3);
 
 // Receive a message with the JMS API
 RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
 map.put(new ActiveMQTopic("topic1"), topicPolicy);
 map.put(new ActiveMQQueue("queue1"), queuePolicy);
注意,从ActiveMQ5.7开始,我们可以给每个目的地(Destination)配置不同的重发策略。
ActiveMQConnection类中有一个成员变量    private RedeliveryPolicyMap redeliveryPolicyMap;,用来给不同的目的地配置不同的重发策略。
至于如何在连接器的URL上配置,可以参考官方文档:http://activemq.apache.org/connection-configuration-uri.html

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    springboot整合activemq 消费者 ACK手动确认 &消息重发

    springboot整合 activeMq 消费者 消费接收消息 包含队列模式点对点发 以及 主题模式一对多 这是消费者的demo consumer 。...里面有消息重发机制,手动确认ACK模式。 配合 producer 生产者demo使用。

    springboot整合activeMq的使用,队列,主题,消息手动确认,重发机制

    在本文中,我们将深入探讨如何将Spring Boot与ActiveMQ整合,以及如何利用它来实现队列、主题、消息手动确认和重发机制。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它允许应用程序通过发布/订阅(pub/...

    ActiveMQ消息传送机制以及ACK机制详解

    此外,Consumer还可以利用“重发策略”等机制,确保在异常情况下仍能接收到未被确认的消息。 3. Broker:作为消息中心,Broker接收来自Producer的消息,存储并转发给Consumer。它在Producer和Consumer之间起到了解...

    0927分布式消息通信-ActiveMQ1

    1. **ActiveMQ重发机制**:消息重发通常发生在消息确认失败或者需要确保消息至少被消费一次的情况下,如网络中断、消费者异常等。 2. **注册流程完善**:在用户注册流程中,可以通过ActiveMQ发送邮件验证用户邮箱,...

    ActiveMQ快速上手 PDF

    - **消息重递策略**:定义消息重发的条件和次数。 - **慢消费者处理**:处理因消费者处理速度过慢而导致的问题。 #### 十三、杂项技术 - **监控和管理Broker**:使用 ActiveMQ 提供的监控工具来监控 Broker 的运行...

    RabbitMQ、RocketMQ、Kafka、ActiveMQ消息中间件常见的面试题目

    RabbitMQ、RocketMQ、Kafka、ActiveMQ 消息中间件常见的面试题目 了解消息中间件的使用场景是非常重要的,面试官可能会问你为什么使用消息队列,消息队列都有哪些使用场景,然后你项目里具体是什么场景,说说你在这...

    ActiveMQ-Artemis .pdf

    11. **消息重发和未交付消息处理**:解释了如何处理消息重发和未交付的消息。 12. **消息过期和大型消息处理**:阐明了消息过期的处理机制和大型消息的分页处理。 13. **客户端重连和会话重新附加**:描述了客户端...

    ActiveMQ使用手册(中文版)

    - **定义:** ActiveMQ默认的消息存储机制。 - **特点:** 使用简单的文件系统结构来存储消息。 **6.2 Kaha Message Store:** - **定义:** 提供更高级别的消息存储支持。 - **特点:** 更好的性能和可扩展性。 **6.3...

    ActiveMQ in Action

    书中详述了安全认证、授权、创建自定义安全插件以及基于证书的安全授权机制等多个方面,旨在帮助开发者构建安全可靠的消息系统。 ActiveMQ在实际应用开发中扮演着多种角色。作者介绍了如何在Java程序中嵌入ActiveMQ...

    AcitveMQ消息队列.pdf

    如果遇到突发情况,如断电,定时任务可以检查并重发状态为0的消息。 使用ActiveMQ的好处包括: 1. 减轻服务器压力:通过消息队列,服务器可以将非实时任务异步处理,避免了因处理大量并发请求导致的服务器过载。 2....

    Java思维导图xmind文件+导出图片

    ActiveMQ消息确认及重发策略 ActiveMQ基于Spring完成分布式消息队列实战 Kafka Kafka基于Zookeeper搭建高可用集群实战 kafka消息处理过程剖析 Java客户端实现Kafka生产者与消费者实例 kafka的副本机制及选举...

    Java面试MQ(Message Queue)消息队列.pdf

    - 使用ACK机制确认消息已成功处理,未收到确认则重发消息。 - 对于Kafka,可以通过设置合适的副本因子(replication factor)来保证消息的冗余存储。 #### 八、如何保证消息的顺序性? - 在某些应用场景下,需要保证...

    java面试题,180多页,绝对良心制作,欢迎点评,涵盖各种知识点,排版优美,阅读舒心

    【消息队列】ActiveMQ中的消息重发时间间隔和重发次数吗? 164 【Dubbo】dubbo介绍 166 Dubbo 是什么 166 Dubbo 架构流程图 167 调用流程 167 注册中心 168 Dubbo常识 168 【Dubbo】dubbo运行时,突然所有的zookeeper...

    RBAC.docx

    为确保RabbitMQ消息的可靠传输,可采取以下措施:1) 使用确认机制,生产者发送消息后等待ACK确认,确保消息已到达队列;2) 消息持久化,即使服务器重启,已持久化的消息也不会丢失。为了防止消息重复消费,可以在...

    PHP+MySQL实现消息队列的方法分析

    消息队列是一种用于处理进程间通信的机制,允许消息在进程间传递,各个进程可以根据自己的速度进行处理,实现了应用的解耦合以及服务的异步处理。常见的消息队列有ActiveMQ、RabbitMQ和Kafka等。 二、构建消息队列...

    2023一线互联网大厂Java面试题集

    15. **消息重发策略**: - 包括定时重试、死信队列、确认机制(如ACK)等,确保消息正确传递。 以上只是题集的部分内容,完整版将包含更多关于并发编程、Spring框架以及其他高级Java技术的题目,帮助求职者全面...

Global site tag (gtag.js) - Google Analytics