`

activemq 持久化topic处理过程及其消息游标轮转问题的解决方案

 
阅读更多

    如果消息是持久化的,activemq收到消息后会存储在持久性cursor中。对于非持久化消息,会存储在File Cursor中。从名称上File Cursor是持久性cursor,实际上activemq把FilePendingMessageCursor作为非持久性cursor。File Cursor首先在内存中保存消息的引用,如果内存使用量达到上限,那么会把消息引用保存到临时文件中,这样就不会因为有大量消息没有被消费而导致OOM。当消息consumer启动时,mq收到消息会激活broker上的DurableTopicSubscrition,该subscription会检查当前的消息cursor里是否有消息,如果有就会dispatch这些消息,这样consumer就能收到之前发送的消息了。

 

    由于同一个topic会有多个消息cursor,所以判断cursor是否有消息时会轮询cursor队列,具体代码:

StoreDurableSubscriberCursor 

 

public synchronized boolean hasNext() {
        boolean result = true;
        if (result) {
            try {
                currentCursor = getNextCursor();    //A
            } catch (Exception e) {
                LOG.error("Failed to get current cursor ", e);
                throw new RuntimeException(e);
            }
            result = currentCursor != null ? currentCursor.hasNext() : false;
        }
        return result;
    }
 

 

代码逻辑分析:得到cursor队列中下一个cursor,然后判断该cursor是否有消息。下面看getNextCursor的代码:

 

 

protected synchronized PendingMessageCursor getNextCursor() throws Exception {
        if (currentCursor == null || currentCursor.isEmpty()) {  // E
            currentCursor = null;
            for (PendingMessageCursor tsp : storePrefetches) {
                if (tsp.hasNext()) {                              
                    currentCursor = tsp;
                    break;
                }
            }
            // round-robin
            if (storePrefetches.size()>1) {
                PendingMessageCursor first = storePrefetches.remove(0);
                storePrefetches.add(first);
            }
        }
        return currentCursor;
    }
 

 

代码逻辑:判断cursor是否为空,如果为null或者empty,如果是,则从cursor队列中获得下一个PendingMessageCursor;如果该cursor的hasNext()为true,则跳出循环。下面会对cursor队列做Load balance处理,将第一个cursor放到队列尾部。这样设计的目的是保证所有的cursor都会被访问到。

 

下面看isEmpty()的实现:

 

AbstractStoreCursor

 public final synchronized boolean isEmpty() {

        return size == 0;
    }

 

下面看hasNext()的实现: 

AbstractStoreCursor

public final synchronized boolean hasNext() {
        if (batchList.isEmpty()) {
            try {
                fillBatch();
            } catch (Exception e) {
                LOG.error(this + " - Failed to fill batch", e);
                throw new RuntimeException(e);
            }
        }
        ensureIterator();
        return this.iterator.hasNext();
    }

 

 从实现来看,isEmpty和hasNext的判断方式不同,在极端情况下两者的返回值可能会不同,可能isEmpty为true而hasNext为false。

 

在激活DurableTopicSubscrition的时候,执行顺序是先检查cursor中消息数量,然后dispatch消息。由于在代码E处可能发现当前cursor不是empty,于是将该cursor返回,这样在代码A处会获得持久cursor,该cursor中可能会没有消息,不会继续dispatch消息。这样就导致在cursor队列中的非持久cursor始终无法获得执行的机会,导致非持久消息不会被dispatch。

 

解决方案,在getNextCursor()中增加hasNext()的判断,保证cursor在null或者empty或者没有元素的情况下会轮转到下一cursor:

 

protected synchronized PendingMessageCursor getNextCursor() throws Exception {
        if (currentCursor == null || currentCursor.isEmpty() || !currentCursor.hasNext()) {
            ……
        }
        return currentCursor;
    }

 

 

 

分享到:
评论

相关推荐

    ActiveMQ中Topic持久化Demo

    总结起来,ActiveMQ中的Topic持久化涉及到消息和订阅的持久化,通过合理的配置和编程接口,我们可以确保在系统故障后,消息传递的连续性和完整性。在实际应用中,了解和掌握这部分知识对于构建可靠和容错的分布式...

    activemq持久化jdbc所需jar包.zip

    标题中的"activemq持久化jdbc所需jar包.zip"指的是Apache ActiveMQ消息中间件在使用JDBC(Java Database Connectivity)进行消息持久化时所需的库文件集合。ActiveMQ是一款开源、高性能、跨语言的企业级消息代理,它...

    activeMQ mysql 持久化

    标题中的“ActiveMQ MySQL 持久化”指的是在使用ActiveMQ消息中间件时,将消息数据存储到MySQL数据库中以实现数据的持久化。ActiveMQ是Apache软件基金会的一个开源项目,它是一个功能丰富的消息代理,支持多种消息...

    linux环境下ActiveMQ持久化、集群环境搭建详解

    ActiveMQ 持久化是指将消息队列持久化到数据库或文件中,以便在断电或崩溃后恢复消息队列。可以使用 Apache ActiveMQ 的持久化机制,例如使用 KahaDB 或 AMQP 等。 集群环境 ActiveMQ 集群环境是指多个 ActiveMQ ...

    spring+activemq topic持久化订阅

    spring +activemq topic消息持久化订阅实例,整个项目中有activemq和spring的整合的所有实例,topic的持久化配置是在ApplicationContext3C、ApplicationContext3C2以及ApplicationContext3P三个中,消息生产者:...

    ActiveMQ订阅模式持久化实现

    这通常涉及修改`activemq.xml`,设置`<destinationPolicy>`元素中的`<policyEntry>`,将`persistent`属性设为`true`,以确保消息在存储和传输过程中被持久化。 2. **创建订阅者**:在Java代码中,消费者需要通过`...

    activemq消息持久化所需Jar包

    本主题主要围绕“activemq消息持久化所需Jar包”展开,将深入探讨ActiveMQ的消息持久化机制及其相关依赖。 首先,了解消息持久化的概念至关重要。在分布式系统中,消息持久化是指当消息代理(如ActiveMQ)接收到...

    spring集成activemq演示queue和topic 持久化

    在本示例中,我们将深入探讨如何将Spring框架与ActiveMQ集成,以便实现消息队列(Queue)和主题(Topic)的功能,并确保消息的持久化。ActiveMQ是Apache软件基金会开发的一个开源消息中间件,它支持多种消息协议,如...

    ActiveMQ队列消息过期时间设置和自动清除解决方案.docx

    ActiveMQ 队列消息过期时间设置和自动清除解决方案 ActiveMQ 是一个开源的消息队列系统,用于实现分布式系统之间的异步通信。在使用 ActiveMQ 时,消息过期时间设置和自动清除是一个非常重要的问题。本文将介绍 ...

    ActiveMQ消息过期时间设置和自动清除解决方案

    ### ActiveMQ 消息过期时间设置与自动清除解决方案 #### 概述 在消息队列的场景下,为了防止消息长时间滞留在队列中占用资源或者为了满足业务上对消息时效性的需求,通常需要对消息设定过期时间。本文档详细介绍了...

    ActiveMQ配置Mysql8为持久化方式所需Jar包.rar

    本主题主要探讨如何将ActiveMQ配置为使用MySQL 8作为其持久化存储方式,以及在这个过程中所需的Jar包。 1. **ActiveMQ与持久化**: - ActiveMQ允许用户选择不同的持久化机制,包括文件系统(KahaDB)和关系数据库...

    activemq-5.15.15 JDBC持久化mysql8.0+的activemq.xml.pdf

    总结来说,这个配置文件展示了如何配置ActiveMQ 5.15.15使用JDBC和MySQL 8.0+进行消息持久化,以及解决XML中特殊字符转义的问题,以确保ActiveMQ能够正确地连接并使用MySQL数据库存储和检索消息。这种配置适用于需要...

    7道消息队列ActiveMQ面试题!

    在面试中,面试官可能会问到关于ActiveMQ的一些基础和深入的问题,比如ActiveMQ的特性、消息传递机制、故障处理、消息持久化、性能调优以及消息消费等方面的知识。 1. ActiveMQ的核心概念和功能 ActiveMQ提供了多种...

    ActiveMQ持久化机制代码实例

    在讨论ActiveMQ的持久化机制时,一个常见的问题是:如果服务器断电重启,未被消费的消息会怎样处理?如前所述,对于非持久化消息,这些消息将丢失;而对于持久化消息,它们将在服务器重启后继续被消费。 在提供的...

    一个jms activemq Topic 消息实例

    一个jms activemq Topic 消息实例 关于jms JMS 是接口,相当于jdbc ,要真正使用它需要某些厂商进行实现 ,即jms provider 常见的jms provider 有 ActiveMQ JBoss 社区所研发的 HornetQ (在jboss6 中默认即可以...

    ActiveMQ高并发处理方案

    然而,在处理大量并发连接和消息时,ActiveMQ可能会遇到性能瓶颈,例如连接资源耗尽、消息处理不均等问题。本文将详细介绍ActiveMQ在高并发环境下的优化策略,包括异常处理、连接池使用、消费者公平调度以及系统整体...

    消息队列:ActiveMQ:ActiveMQ消息持久化机制.docx

    消息队列:ActiveMQ:ActiveMQ消息持久化机制.docx

    activemq 虚拟topic与路由功能

    ### ActiveMQ虚拟Topic与路由功能详解 在ActiveMQ消息中间件中,为了更好地实现消息的分发和管理,...通过对上述内容的理解,相信读者已经能够掌握如何在实际项目中有效利用虚拟Topic和复合队列来优化消息处理流程。

    ActiveMQ Topic 实例

    在分布式系统中,消息队列(Message Queue)作为一种中间件,起到了解耦、异步处理以及负载均衡...在探索ActiveMQ Topic的过程中,记得关注性能优化、安全性设置以及故障恢复策略,这些都是构建健壮消息系统的关键点。

Global site tag (gtag.js) - Google Analytics