`
1028826685
  • 浏览: 940636 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类

Consumer rebalance失败问题定位和解决思路

 
阅读更多

背景

最近在公司使用Kafka的Consumer高级API出现一些问题,问题描述如下: 
大象push推送队列,在(大约每天8点到10点间)发送消息高峰期,消费节点负载处于高位、jvm内存占用大于%80。这时候JVM会频繁持续FullGC而卡住异步线程(stop the world),心跳等异步线程就没法正常收发数据包。这种情况下导致zk会话过期触发Consumer rebalance,zk会话过期很频繁会发rebalance,rebalance过程中因为写入临时节点冲突,Consumer rebalance失败而无法分配分片数据(即Consumer掉线)。

rebalance失败官方解释: 
consumer rebalancing fails (you will see ConsumerRebalanceFailedException): This is due to conflicts when two consumers are trying to own the same topic partition. The log will show you what caused the conflict (search for “conflict in “). 
If your consumer subscribes to many topics and your ZK server is busy, this could be caused by consumers not having enough time to see a consistent view of all consumers in the same group. If this is the case, try Increasing rebalance.max.retries and rebalance.backoff.ms. 
Another reason could be that one of the consumers is hard killed. Other consumers during rebalancing won’t realize that consumer is gone after zookeeper.session.timeout.ms time. In the case, make sure that rebalance.max.retries * rebalance.backoff.ms > zookeeper.session.timeout.ms.

如下配置会导致rebalance失败 
rebalance.max.retries * rebalance.backoff.ms < zookeeper.session.timeout.ms

为什么大象push推送队列jvm内存占用高?

消费端会将消息异步推送到下游系统,异步发出的请求放到内存map当中。如果收到下游的ack,会将请求从内存map删除,或者请求超过5s未收到ack,自动删除。当前流控会在内存map超过2000请求时触发 
大象jvm内存高问题原因梳理: 
大象mafka推送客户端内存占用高导致mafka client掉线。内存占用高初步定为问题为推送端推送量大于下游处理能力,导致推送端请求堆积在netty内存buffer。这个问题可以从高峰期请求推送量(17k/s)和ack量(10k/s)的巨大差距得到印证

目标

保证活跃消费节点能接管或分配到失败消费节点的数据分片,失败消费节点释放握有的数据分片资源。 
完善报警机制,监控和收集rebalance异常情况。

分析

如下图创建一个topicXXX 6分区,消费组中各个消费节点分配情况如下 
这里写图片描述
Consumers在zk上存储结构 
这里写图片描述

Conumer控制逻辑

目前Consumer rebalance的控制策略是由每一个Consumer通过Zookeeper完成的。 
1.每个Consumer客户端启动时,会注册自身到zk自己的Consumer group下. 
2.Watcher Consumer group下Consumers的变化 
3.Watcher Topic变化 
4.Wacher zk session timeout变化

Consumer逻辑说明

a.同一个Consumer Group中的Consumers,Kafka将相应Topic中的每个消息只发送给其中一个Consumer。 
b.Consumer Group中的每个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer; 
c.一个Consumer group的多个consumer的所有线程依次有序地消费一个topic的所有partitions,如果Consumer group中所有consumer总线程大于partitions数量

Consumer rebalacne算法

当一个group中,有consumer加入或者离开时,会触发partitions均衡.均衡的最终目的,是提升topic的并发消费能力. 
1) 假如topic1,具有如下partitions: P0,P1,P2,P3 
2) 加入group中,有如下consumer: C0,C1 
3) 首先根据partition索引号对partitions排序: P0,P1,P2,P3 
4) 根据(consumer.id + ‘-‘+ thread序号)排序: C0,C1 
5) 计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整) 
6) 然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)] 
在这种策略下,每一个consumer或者broker的增加或者减少都会触发consumer rebalance。因为每个consumer只负责调整自己所消费的partition,为了保证整个consumer group的一致性,所以当一个consumer触发了rebalance时,该consumer group内的其它所有consumer也应该同时触发rebalance。

解决思路

为什么活跃消费节点无法分配到下线消费节点的分片数据?

下面分析下原因:目前有三种方式会触发rebalance,其一Topic与partition映射表发生变化;其二同一订阅组中消费节点数发生变化,其三zk会话过期 
前两种直接就做rebalance操作了,最后一种消费节点重新注册临时znode到zk上,然后再做rebalance操作,rebalance失败会导致消费节点下线,其他活跃消费节点也无法分配到分片数据,其实质原因是消费节点rebalance失败下线时并没有从zk的Consumer Group下删除自身临时节点,而每个消费节点的分片数据是根据Consumer Group下数量按照计算规则分配的,所以活跃消费节点无法分配到分片数据。 
消费节点分配分片数据简单计算规则:每个消费节点分片数 =总体分片数/消费节点数

会话过期重新注册消费节点问题:消费端无限重试直到注册成功为止,每次重试会休眠一定时间。

rebalance问题:有限次重试(设置阀值)内无法分配分片,则分配分片失败。

经过多次测试和验证,发现如果rebalance失败,是因为多个消费节点在zk注册临时节点产生冲突,另外zk同步数据延时性时间拉长远大于(rebalance.max.retries * rebalance.backoff.ms),rebalance就会失败,如果再次触发rebalance且成功,其分片数据会被分配并消费,所以rebalance失败,其实是一个中间件状态。

解决思路-方案1

1.每当消费节点rebalance失败下线时,删除自身注册的临时节点,再次触发rebalance直到分片数据成功分配为止,如分配不成功报警。 
2.当前两种Watcher触发rebalance时,检查并重新注册消费节点到Consumer Group下,重新获得分片数据。

注明

像这种通过zk来协调做rebalance并不是一个最佳方案,会存在脑裂情况。根据Kafka官方文档,Kafka作者正在开发0.9.x版本中使用中心协调器。大体思想是选举出一个broker作为coordinator,由它watch Zookeeper,从而判断是否有分区或者consumer的增减,然后生成rebalance命令,并检查是否这些rebalance在所有相关的Consumer中被执行成功,如果不成功则重试,若成功则认为此次rebalance成功

分享到:
评论

相关推荐

    phpkafkaconsumer是一个kafkaconsumer库支持group和rebalance

    【标题】"phpkafkaconsumer"是一个专门为PHP设计的Kafka消费者库,它不仅提供了基本的Kafka消息消费功能,还特别支持了消费者组(Consumer Group)和再平衡(Rebalance)机制。Kafka是一种分布式流处理平台,常用于...

    php-kafka-consumer.zip

    在此基础上实现了 rebalance 功能以及 group 功能。 经过简单的压力测试,单个进程的消费能力能达到每秒钟7.8W条,压测详细内容见压力测试。依赖php_zookeeperphp_rdkafka (建议使用1.0.0版本)librdkafka(建议...

    kafka rebalance日志

    kafka rebalance期间日志

    rebalance-villagers:Minecraft Bukkit插件。 详情请参阅以下内容

    "rebalance-villagers" 是一个特定的 Bukkit 插件,专注于调整村民(Villager)的行为和交易,以实现更好的游戏平衡。下面将详细探讨该插件的相关知识点以及可能涉及的 Java 编程技术。 1. **Bukkit 插件系统**:...

    consumer.start.pdf

    本文档详细介绍了 RocketMQ 消费者(Consumer)的核心组件及功能,旨在帮助开发者更好地理解和掌握 RocketMQ 的消费者机制。通过深入分析 `consumer.start.pdf` 文件中的关键知识点,我们不仅了解了消费者的工作原理...

    Flink异常.docx

    解决方法是使用 rebalance 解决数据倾斜的问题,但是需要注意引入反压问题。同时,可以使用多种方式来解决数据倾斜的问题,例如,使用 keyBy(0) 按照过车时间对过车数据进行分流操作。 六、Timeout expired while ...

    ORACLE ASM添加磁盘操作步骤

    在实际操作过程中可能会遇到一些常见的错误和异常情况,下面列举了一些典型的问题及其解决方法。 ##### 7.1 处理ORA-15260错误 ORA-15260错误通常发生在尝试执行某些ASM操作时,可能是由于系统资源不足或者ASM实例...

    Kafka高可用性实现原理

    Kafka 通过 Zookeeper 来管理集群配置、选举 leader 以及在 Consumer Group 发生变化时进行 rebalance。Zookeeper 保证了 Kafka 集群的高可用性和高吞吐量。 Kafka 的高可用性实现原理是基于分布式系统架构、消息...

    Kafka高频面试题系列之一(30道)

    这些面试题涵盖了 Kafka 中的一些核心问题,包括消息传输的可靠性、Consumer Group 的工作原理以及系统配置对数据一致性和可用性的影响。理解和掌握这些概念对于在实际工作中有效利用 Kafka 至关重要。

    kafka开发运维实战分享.pptx

    综上所述,Kafka 开发运维涉及众多细节,从选择合适版本到优化配置,再到解决 Rebalance 和分区分配问题,都需要全面考虑。通过深入了解 Kafka 的工作原理和实践经验,可以更好地在生产环境中部署和管理 Kafka 系统...

    Kafka常见问题1

    在Kafka中,有几个核心概念和机制,它们在日常使用中常常引发问题。本文将深入探讨这些关键点,包括Kafka的分区复制策略、消费者的offset管理以及如何处理已消费数据的再次消费。 1. **分区分配算法** Kafka在分配...

    rebalance:基于 Ember CLI 的投资组合再平衡计算器可在 https 获取

    在 Ember CLI 项目中,我们通常会找到诸如 `app`、`config`、`public`、`tests` 和 `node_modules` 等目录,分别包含应用的源码、配置、静态资源、测试用例和依赖库。 **相关知识点扩展:** 1. **投资组合再平衡**...

    kafka实战pdf

    了解Offset管理和Consumer rebalance机制。 6. **Kafka Streams**:了解Kafka提供的流处理库Kafka Streams,用于构建实时数据管道和复杂的应用程序。掌握其核心概念,如状态存储和窗口操作。 7. **连接器...

    NM_rebalance

    在这样的场景下,"rebalance"通常意味着调整资源分布,以确保系统的高效运行和公平性。 在Python中,实现这样的功能可能会用到以下知识点: 1. **文件操作**:Python提供了强大的文件操作库,如`os`和`shutil`,...

    consumer:消费者回购

    消费者回购(Consumer Rebalance)是指当消费者组内的成员发生变化时,如新消费者加入或已有消费者离开,Kafka会自动重新分配主题分区给消费者组内的成员,以保持所有分区的消费进度。这个过程确保了即使在动态变化...

    一文理解Kafka重复消费的原因和解决方案.docx

    **GroupCoordinator**机制是Kafka 0.9版本之后引入的一个关键特性,用于解决基于Zookeeper的消费者组重平衡(Rebalance)中存在的羊群效应和脑裂问题。羊群效应指的是大量节点同时访问同一Zookeeper节点时,可能会...

    分布式存储Ceph解决方案.pptx

    分布式存储 Ceph 解决方案 Ceph 是一个统一的、分布式存储系统,旨在提供优秀的性能、可靠性和可扩展性。Ceph 唯一地提供对象、块和文件存储在一个统一的系统中。 Ceph 的发展历程 * 2004 年,Sage Weil 开始了...

    02-VIP-kafka设计原理详解1

    - 消费者将消费的offset提交到内部topic `__consumer_offsets`,key为consumerGroupId、topic和分区号,value为当前offset值。 - 为了应对高并发,`__consumer_offsets`默认配置为50个分区,可通过`offsets.topic....

    kafka-manage-2.0.0.2

    它还能展示集群的整体健康状况,便于及时发现和解决问题。 2. Topic管理:用户可以通过Kafka Manager创建、修改和删除Topic,调整Partition数量,以及进行Rebalance操作。此外,它还支持查看Topic的配置信息,帮助...

Global site tag (gtag.js) - Google Analytics