`

metaq源码解读之consumer的balance策略

 
阅读更多

        说到metaq的消费者balance策略,不得不说一下分区的有关信息。一个topic可以划分为n个分区。每个分区是一个有序的、不可变的、顺序递增的队列。



 

        分区一方面是为了增大消息的容量(可以分布在多个分区上存,而不会限制在单台机器存储大小里),二方面可以类似看成一种并行度。

        消费者的负载均衡与topic的分区数据紧密相关,需要考虑几种情况:

            1、单个分组内的消费者数目如果比总得分区数目多的话,则多出来的消费者不参与消费。每个分区针对每个消费者group只挂一个消费者,同一个group的多余消费者不参与消费。

            2、如果分组内的消费者数目比分区数目小,则有部分消费者要额外承担消息的消费任务。当分区数目n大于单个group的消费者数目m时,则有n%m个消费者需要额外承担1/n的消费任务。n足够大的时候可以认为负载平均分配。

        综上所述,单个分组内的消费者集群的负载均衡策略如下:

            ①每个分区针对一个group只挂载一个消费者

            ②如果同一个group的消费者数目大于分区数目,则多出来的消费者不参与消费

            ③如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。

        meta客户端处理消费者的负载均衡方式:将消费者列表和分区列表分别排序,然后按照上述规则做合理的挂载。如果某个消费者故障,其他消费者会感知到这一变化,然后重新进行负载均衡,保证所有分区都有消费者进行消费。

        Consumer的balance策略实现在metaq中提供了两种:ConsisHashStrategy和DefaultLoadBalanceStrategy。

        首先,来看DefaultLoadBalanceStrategy的实现:

public List<String> getPartitions(final String topic, final String consumerId, final List<String> curConsumers,
            final List<String> curPartitions) {
        // 每个订阅者平均挂载的partition数目
        final int nPartsPerConsumer = curPartitions.size() / curConsumers.size();
        // 挂载到额外partition的consumer数目
        final int nConsumersWithExtraPart = curPartitions.size() % curConsumers.size();

        log.info("Consumer " + consumerId + " rebalancing the following partitions: " + curPartitions + " for topic "
                + topic + " with consumers: " + curConsumers);
        final int myConsumerPosition = curConsumers.indexOf(consumerId);
        if (myConsumerPosition < 0) {
            log.warn("No broker partions consumed by consumer " + consumerId + " for topic " + topic);
            return Collections.emptyList();
        }
        assert myConsumerPosition >= 0;
        // 计算当前consumer挂载的分区起点
        final int startPart =
                nPartsPerConsumer * myConsumerPosition + Math.min(myConsumerPosition, nConsumersWithExtraPart);
        //计算当前consumer共挂载的分区数=每个consumer的挂载数+额外承担的分区数
        final int nParts = nPartsPerConsumer + (myConsumerPosition + 1 > nConsumersWithExtraPart ? 0 : 1);

        if (nParts <= 0) {
            log.warn("No broker partions consumed by consumer " + consumerId + " for topic " + topic);
            return Collections.emptyList();
        }
        final List<String> rt = new ArrayList<String>();
        for (int i = startPart; i < startPart + nParts; i++) {
            final String partition = curPartitions.get(i);
            rt.add(partition);
        }
        return rt;
    }

 

       

  • 大小: 4.1 KB
分享到:
评论

相关推荐

    metamorphosis(metaq)

    2. 消息存储:MetaQ服务器接收到消息后,会根据分区策略将其持久化到硬盘上,确保消息不会丢失。 3. 消费者:消费者订阅感兴趣的Topic,从MetaQ服务器拉取或接收消息。消费者可以设置为推拉模式,推模式下,服务器...

    Metaq原理与应用

    Metaq 是一种高性能、高可用的消息中间件,其设计灵感来源于 Kafka,但并不严格遵循任何特定的规范,如 JMS(Java Message Service)或 CORBA Notification 规范。Metaq 提供了丰富的特性来解决 Messaging System 中...

    metaQ向spark传数据

    在大数据处理领域,MetaQ和Spark是两个非常关键的组件。MetaQ是腾讯开源的一款分布式消息中间件,常用于实时数据处理系统中的消息传递。而Spark则是一个强大的、通用的并行计算框架,专为大数据分析设计,尤其擅长...

    metaq-server-1.4.6.2.tar.gz

    MetaQ Server的架构主要包括Producer、Consumer、Broker和Controller四个主要部分: 1. Producer:生产者负责将消息发送到MetaQ Server,通过API接口与Server进行通信。 2. Consumer:消费者从MetaQ Server订阅并...

    Metaq在JDk 7下的异常及解决方案

    《Metaq在JDK 7下的异常及其解决策略》 Metaq是一款高性能的消息中间件,广泛应用于分布式系统中,提供高效、稳定的消息传递服务。然而,在JDK 7环境下,Metaq可能会遇到一些运行异常,其中最常见的就是与物理文件...

    metaq-server-1.4.6.2.zip 和原版一样就是换了个名字

    Client则分为Producer和Consumer,Producer负责发送消息,Consumer负责接收和消费消息。 在性能优化方面,MetaQ 1.4.6.2版本可能包括了对网络IO、多线程并行处理和内存管理等方面的改进。例如,可能会采用零拷贝...

    metaq-server-1.4.6.2客户端+服务端

    MetaQ是阿里巴巴开源的一款分布式消息中间件,它主要用于在大规模分布式系统中提供高效、可靠的消息传递服务。MetaQ Server 1.4.6.2版本是这个中间件的一个特定发行版,包含了服务端和客户端的组件,以及相关的...

    MetaQ 分布式消息服务中间件.pdf

    在这一模型中,发布者(Producer)将消息发布到MetaQ,MetaQ会储存这些消息,而订阅者(Consumer)则通过pull方式来消费这些消息。具体而言,消费者主动从MetaQ拉取数据,解析成消息并进行消费。MetaQ的架构设计中,...

    Metaq详细手册.docx

    《Metaq详细手册》 Metaq,源自LinkedIn的开源消息中间件Kafka的Java实现——Memorphosis,针对淘宝内部的应用需求进行了定制和优化。它遵循一系列设计原则,旨在提供高效、可靠且灵活的消息传递服务。 1. **消息...

    metaQ的安装包

    MetaQ,全称为“Meta Message Queue”,是阿里巴巴开源的一款分布式消息中间件,主要用于解决大规模分布式系统中的消息传递问题。MetaQ 提供了高可用、高可靠的消息服务,支持多种消息模型,如点对点(Point-to-...

    metaq消息中间件服务端、客户端资源汇集

    Metamorphosis是淘宝开源的一个Java消息中间件,他类似apache-kafka,但不是一个简单的山寨拷贝,而是做了很多改进和优化,项目的主页在淘蝌蚪上。服务端、客户端、javadoc都包含在内。

    阿里rocketMQ

    - **高可用性**:通过主备切换和多副本策略确保服务的高可用。 - **高吞吐量**:采用批量发送、预读取和批量拉取等优化手段,提升消息处理速度。 - **低延迟**:通过优化网络协议和数据结构,降低消息传输延迟。 - *...

    支付宝之所以牛逼的原因:来看内部架构剖析

    Metamorphosis(MetaQ)是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在...

    阿里云ons指南

    阿里云ONS(消息队列服务)是基于阿里云提供的分布式的、高可靠性的消息服务产品,它来源于阿里内部广泛使用的消息中间件MetaQ,亦即后来的开源项目RocketMQ。ONS支持海量消息的生产与消费,以无单点故障、高可扩展...

    RocketMQ最全介绍与实战.pdf

    RocketMQ 的前世今生是 Metaq,Metaq 在阿里巴巴集团内部、蚂蚁金服、菜鸟等各业务中被广泛使用,接入了上万个应用系统中。 RocketMQ 的使用场景包括应用解耦、流量削峰等。应用解耦系统的耦合性越高,容错性就越低...

    支付宝钱包系统架构内部剖析(架构图)

    - **纯Java实现**:无论是通信层还是存储层,MetaQ均使用Java语言实现,这对于支付宝这样的大型企业而言非常重要,因为Java是业界广泛使用的编程语言之一,这意味着更容易找到熟练掌握该语言的开发人员。 - **事务...

    消息中间件 rocketmq原理解析

    RocketMQ是一个高性能、高可靠性、可伸缩、...通过阅读和理解RocketMQ的源码,开发者可以更深入地了解消息中间件的原理,并在实践中更加熟练地使用它。希望以上内容能够帮助大家在学习和使用RocketMQ的过程中有所收获。

    kafka学习文档

    数据消费者,consumer 的用法:《consumer 的用法》 迓有些零碎的,关亍通信段的源码览读:《net 包源码览读》、《broker 配置》 扩展的阅读迓有下面返些: 关亍 kafka 和 jafka 的相关博客,特删好,有徆多问题也都...

    rocketmq-note.pdf

    Producer和Consumer在发送和接收消息时,可以通过负载均衡策略选择合适的Broker,保证消息的高效传递。 在物理架构中,NameServer首先启动,接着Broker注册到NameServer,Producer在发送消息前获取Broker列表,然后...

Global site tag (gtag.js) - Google Analytics