`
大涛学长
  • 浏览: 110648 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

如何分析及处理 Flink 反压?

阅读更多
反压(backpressure)是实时计算应用开发中,特别是流式计算中,十分常见的问题。反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速。由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是 pull-based 的,所以反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。

关于 Flink 的反压机制,网上已经有不少博客介绍,中文博客推荐这两篇1。简单来说,Flink 拓扑中每个节点(Task)间的数据都以阻塞队列的方式传输,下游来不及消费导致队列被占满后,上游的生产也会被阻塞,最终导致数据源的摄入被阻塞。而本文将着重结合官方的博客\[4\]分享笔者在实践中分析和处理 Flink 反压的经验。

反压的影响
-----

反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟。通常来说,对于一些对延迟要求不太高或者数据量比较小的应用来说,反压的影响可能并不明显,然而对于规模比较大的 Flink 作业来说反压可能会导致严重的问题。

这是因为 Flink 的 checkpoint 机制,反压还会影响到两项指标: checkpoint 时长和 state 大小。

*   前者是因为 checkpoint barrier 是不会越过普通数据的,数据处理被阻塞也会导致 checkpoint barrier 流经整个数据管道的时长变长,因而 checkpoint 总体时间(End to End Duration)变长。
*   后者是因为为保证 EOS(Exactly-Once-Semantics,准确一次),对于有两个以上输入管道的 Operator,checkpoint barrier 需要对齐(Alignment),接受到较快的输入管道的 barrier 后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的 barrier 也到达,这些被缓存的数据会被放到state 里面,导致 checkpoint 变大。

这两个影响对于生产环境的作业来说是十分危险的,因为 checkpoint 是保证数据一致性的关键,checkpoint 时间变长有可能导致 checkpoint 超时失败,而 state 大小同样可能拖慢 checkpoint 甚至导致 OOM (使用 Heap-based StateBackend)或者物理内存使用超出容器资源(使用 RocksDBStateBackend)的稳定性问题。

因此,我们在生产中要尽量避免出现反压的情况(顺带一提,为了缓解反压给 checkpoint 造成的压力,社区提出了 FLIP-76: Unaligned Checkpoints\[4\] 来解耦反压和 checkpoint)。

定位反压节点
------

要解决反压首先要做的是定位到造成反压的节点,这主要有两种办法:

1.  通过 Flink Web UI 自带的反压监控面板;
2.  通过 Flink Task Metrics。

前者比较容易上手,适合简单分析,后者则提供了更加丰富的信息,适合用于监控系统。因为反压会向上游传导,这两种方式都要求我们从 Source 节点到 Sink 的逐一排查,直到找到造成反压的根源原因\[4\]。下面分别介绍这两种办法。

### 反压监控面板

Flink Web UI 的反压监控提供了 SubTask 级别的反压监控,原理是通过周期性对 Task 线程的栈信息采样,得到线程被阻塞在请求 Buffer(意味着被下游队列阻塞)的频率来判断该节点是否处于反压状态。默认配置下,这个频率在 0.1 以下则为 OK,0.1 至 0.5 为 LOW,而超过 0.5 则为 HIGH。

![_1](https://yqfile.alicdn.com/ee2f6bf9b2a4dd1d6915c6c0310a6f973a837ce6.png)

图1. Flink 1.8 的 Web UI 反压面板(来自官方博客)

如果处于反压状态,那么有两种可能性:

1.  该节点的发送速率跟不上它的产生数据速率。这一般会发生在一条输入多条输出的 Operator(比如 flatmap)。
2.  下游的节点接受速率较慢,通过反压机制限制了该节点的发送速率。

如果是第一种状况,那么该节点则为反压的根源节点,它是从 Source Task 到 Sink Task 的第一个出现反压的节点。如果是第二种情况,则需要继续排查下游节点。

**值得注意的是,反压的根源节点并不一定会在反压面板体现出高反压**,因为反压面板监控的是发送端,如果某个节点是性能瓶颈并不会导致它本身出现高反压,而是导致它的上游出现高反压。总体来看,如果我们找到第一个出现反压的节点,那么反压根源要么是就这个节点,要么是它紧接着的下游节点。

那么如果区分这两种状态呢?很遗憾只通过反压面板是无法直接判断的,我们还需要结合 Metrics 或者其他监控手段来定位。此外如果作业的节点数很多或者并行度很大,由于要采集所有 Task 的栈信息,反压面板的压力也会很大甚至不可用。

### Task Metrics

Flink 提供的 Task Metrics 是更好的反压监控手段,但也要求更加丰富的背景知识。

首先我们简单回顾下 Flink 1.5 以后的网路栈,熟悉的读者可以直接跳过。

**TaskManager 传输数据时**,不同的 TaskManager 上的两个 Subtask 间通常根据 key 的数量有多个 Channel,这些 Channel 会复用同一个 TaskManager 级别的 TCP 链接,并且共享接收端 Subtask 级别的 Buffer Pool。

**在接收端**,每个 Channel 在初始阶段会被分配固定数量的 Exclusive Buffer,这些 Buffer 会被用于存储接受到的数据,交给 Operator 使用后再次被释放。Channel 接收端空闲的 Buffer 数量称为 Credit,Credit 会被定时同步给发送端被后者用于决定发送多少个 Buffer 的数据。

**在流量较大时**,Channel 的 Exclusive Buffer 可能会被写满,此时 Flink 会向 Buffer Pool 申请剩余的 Floating Buffer。这些 Floating Buffer 属于备用 Buffer,哪个 Channel 需要就去哪里。而在 Channel 发送端,一个 Subtask 所有的 Channel 会共享同一个 Buffer Pool,这边就没有区分 Exclusive Buffer 和 Floating Buffer。

![_2](https://yqfile.alicdn.com/f142c4f0f14e6b5e73f64f09b992e61f9b56b17e.png)

图2. Flink Credit-Based 网络

我们在监控反压时会用到的 Metrics 主要和 Channel 接受端的 Buffer 使用率有关,最为有用的是以下几个 Metrics:

Metris

描述

outPoolUsage

发送端 Buffer 的使用率

inPoolUsage

接收端 Buffer 的使用率

floatingBuffersUsage(1.9 以上)

接收端 Floating Buffer 的使用率

exclusiveBuffersUsage (1.9 以上)

接收端 Exclusive Buffer 的使用率

其中 inPoolUsage 等于 floatingBuffersUsage 与 exclusiveBuffersUsage 的总和。

**分析反压的大致思路是:**如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游。反压情况可以根据以下表格进行对号入座(图片来自官网):

![_3](https://yqfile.alicdn.com/7681467a110b64d494b9da56925e25d76e83d96f.png)

图3. 反压分析表

outPoolUsage 和 inPoolUsage 同为低或同为高分别表明当前 Subtask 正常或处于被下游反压,这应该没有太多疑问。而比较有趣的是当 outPoolUsage 和 inPoolUsage 表现不同时,这可能是出于反压传导的中间状态或者表明该 Subtask 就是反压的根源。

如果一个 Subtask 的 outPoolUsage 是高,通常是被下游 Task 所影响,所以可以排查它本身是反压根源的可能性。如果一个 Subtask 的 outPoolUsage 是低,但其 inPoolUsage 是高,则表明它有可能是反压的根源。因为通常反压会传导至其上游,导致上游某些 Subtask 的 outPoolUsage 为高,我们可以根据这点来进一步判断。值得注意的是,反压有时是短暂的且影响不大,比如来自某个 Channel 的短暂网络延迟或者 TaskManager 的正常 GC,这种情况下我们可以不用处理。

对于 Flink 1.9 及以上版本,除了上述的表格,我们还可以根据 floatingBuffersUsage/exclusiveBuffersUsage 以及其上游 Task 的 outPoolUsage 来进行进一步的分析一个 Subtask 和其上游 Subtask 的数据传输。

![_4](https://yqfile.alicdn.com/3d6a29a6fb130d6337e6f07baf565e02ef3cf80f.png)

图4. Flink 1.9 反压分析表

通常来说,floatingBuffersUsage 为高则表明反压正在传导至上游,而 exclusiveBuffersUsage 则表明了反压是否存在倾斜(floatingBuffersUsage 高、exclusiveBuffersUsage 低为有倾斜,因为少数 channel 占用了大部分的 Floating Buffer)。

至此,我们已经有比较丰富的手段定位反压的根源是出现在哪个节点,但是具体的原因还没有办法找到。另外基于网络的反压 metrics 并不能定位到具体的 Operator,只能定位到 Task。特别是 embarrassingly parallel(易并行)的作业(所有的 Operator 会被放入一个 Task,因此只有一个节点),反压 metrics 则派不上用场。

分析具体原因及处理
---------

定位到反压节点后,分析造成原因的办法和我们分析一个普通程序的性能瓶颈的办法是十分类似的,可能还要更简单一点,因为我们要观察的主要是 Task Thread。

**在实践中,很多情况下的反压是由于数据倾斜造成的**,这点我们可以通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。

**此外,最常见的问题可能是用户代码的执行效率问题(频繁被阻塞或者性能问题)**。最有用的办法就是对 TaskManager 进行 CPU profile,从中我们可以分析到 Task Thread 是否跑满一个 CPU 核:如果是的话要分析 CPU 主要花费在哪些函数里面,比如我们生产环境中就偶尔遇到卡在 Regex 的用户函数(ReDoS);如果不是的话要看 Task Thread 阻塞在哪里,可能是用户函数本身有些同步的调用,可能是 checkpoint 或者 GC 等系统活动导致的暂时系统暂停。

当然,性能分析的结果也可能是正常的,只是作业申请的资源不足而导致了反压,这就通常要求拓展并行度。值得一提的,在未来的版本 Flink 将会直接在 WebUI 提供 JVM 的 CPU 火焰图\[5\],这将大大简化性能瓶颈的分析。

**另外 TaskManager 的内存以及 GC 问题也可能会导致反压**,包括 TaskManager JVM 各区内存不合理导致的频繁 Full GC 甚至失联。推荐可以通过给 TaskManager 启用 G1 垃圾回收器来优化 GC,并加上 -XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。

总结
--

反压是 Flink 应用运维中常见的问题,它不仅意味着性能瓶颈还可能导致作业的不稳定性。定位反压可以从 Web UI 的反压监控面板和 Task Metric 两者入手,前者方便简单分析,后者适合深入挖掘。定位到反压节点后我们可以通过数据分布、CPU Profile 和 GC 指标日志等手段来进一步分析反压背后的具体原因并进行针对性的优化。

 

 

[原文链接](https://yq.aliyun.com/articles/727389?utm_content=g_1000090281)

本文为云栖社区原创内容,未经允许不得转载。
分享到:
评论

相关推荐

    flink反压现象模拟与分析

    在Apache Flink中,反压(Back Pressure)是一种常见的性能问题,它发生在数据处理系统中,当下游处理速度无法跟上上游数据生产速度时,上游会逐渐积累未处理的数据,导致整个流水线的效率降低。本文将深入探讨Flink...

    flink介绍PPT

    Flink能够对实时数据进行分析,并能够存储和查询历史数据,这使得Flink不仅能够处理大规模的批处理任务,也能处理复杂的数据流应用。它支持实时处理数据流,能够实现有状态的数据处理,这对于复杂的流处理应用尤为...

    Flink总结.docx

    1. **流处理支持**:Flink能够处理连续的数据流,支持事件时间窗口操作,允许开发者根据事件的发生时间而非处理时间进行分析。它还支持有状态计算,确保在处理过程中可以存储和管理中间结果。 2. **Exactly-once...

    Apache Flink 十大技术难点实战.pdf

    为了分析及处理Flink反压,需要关注数据的流入和流出情况,通过查看监控指标或者配置日志,分析出问题所在并采取相应措施。 在YARN环境中部署Flink作业时,会遇到一些常见的问题。Flink on YARN的部署涉及了基础...

    flink课堂讲义.docx

    Flink 附随了一些产生 DataSet 或 DataStream API 程序的类库和 API,包括 Table 机器学习的 FlinkML,图像处理的 Gelly,事件处理的 CEP 等。这些类库和 API 都是 Flink 生态系统的重要组成部分,共同构成了 Flink ...

    Flink 调优介绍,包括大状态、数据倾斜、反压等监控以及处理方式

    本文将重点探讨Flink作业中的常见调优策略,包括CPU与内存配置、大状态调优、反压处理及数据倾斜优化等方面。 #### 二、CPU与内存配置调优 1. **内存配置** - `-Djobmanager.memory.process.size= 2048 mb`:...

    flink流式处理框架的架构与应用

    Flink的应用场景丰富多样,包括实时优化电子商务搜索结果、为数据科学团队提供流式处理服务、网络和传感器监控及错误检测,以及业务智能基础设施中的ETL处理。Flink的实时处理能力使这些应用能够快速响应数据变化,...

    从0到1学Flink,入门教程典范

    通过FlinkSQL,用户可以使用SQL进行复杂的数据处理和分析任务。 ### Python API应用实践 Flink也提供了Python API,使得Python开发者能够利用Flink的强大流处理能力。Python API提供了与Java API相似的功能,使得...

    Apache Flink-实时计算正当时.pdf

    为了回答这一问题,Flink 引入了描述作业繁忙(即在处理数据)与反压(由于下游算子不能及时处理结果而无法继续输出)程度的指标。应用中可能的瓶颈是那些繁忙并且上游被反压的算子。Flink 1.13 优化了反压检测的...

    Flink技术独家解读

    **2.6 Flink反压** 反压机制是Flink在流处理中的一项重要特性,用于解决数据处理过程中上下游处理速度不匹配的问题。通过反压机制,下游处理较慢的任务可以向上游发送信号,从而控制上游数据的生产速度,避免数据...

    Flink异常.docx

    Flink 异常处理 Flink 是一个流行的开源流处理引擎,但是在使用过程中...Flink 异常处理需要根据实际情况进行分析和解决。在处理 Flink 异常时,需要检查程序的资源使用情况、配置和输出,确保程序的正确性和稳定性。

    藏经阁-零基础入门:从0到1学会 Apache Flink-204.pdf

    《藏经阁-零基础入门:从0到1学会 Apache Flink》是一份全面...通过这些深入讲解,读者可以逐步掌握Apache Flink的基础知识,从零开始构建实时数据处理系统,无论是用于数据管道、实时分析还是机器学习,都能得心应手。

    基于Flink的流计算平台

    Apache Flink是一个开源的流处理框架,被广泛应用于实时数据分析。基于Flink构建的流计算平台,如阿里巴巴的StreamCompute(也称Alibaba Blink),旨在解决传统流计算开发和运维中的诸多痛点。 1. **开发挑战**: ...

    智能流计算Flink_Spark在华为云CloudStream中的应用实践-时金魁1

    流计算是一种实时处理正在发生的流数据的技术,它对数据进行逐条分析或算法运算,尤其适合处理源源不断的数据流。在华为云CloudStream中,Flink和Spark被广泛应用于智能流计算,以应对各种实时处理需求。 Flink的...

    Flink 流数据批量写入数据库

    在实际应用中,Flink 往往需要将处理后的流数据写入数据库,以供进一步分析或实时查询。然而,如果直接使用默认的单条写入方式,可能会导致数据库压力过大,写入速度慢,进而引发反压问题。因此,本文将详细介绍如何...

    最热门的大数据面试题汇总

    8. **处理Flink作业频繁重启**:分析日志找出失败原因,优化代码,调整资源分配,提高容错能力,如设置合适的重试策略和backoff时间。 9. **广播流与普通流**:在Flink中,广播流将同一份数据复制到每个并行实例,...

    上海校区大数据企业面试真题V1.docx

    5. **Kylin**: Kylin是开源的在线分析处理(OLAP)系统,用于构建Cube,提供快速查询。构建Cube涉及数据预处理、多维模型设计和Cube构建。 6. **资源管理和反压**: Spark的任务资源设置根据业务需求和集群资源调整...

    挑战双11实时数据洪峰的流计算实践.pdf

    流计算是一种处理连续数据流的技术,它能够实时分析和处理大量数据,为业务决策提供及时的洞察。 阿里巴巴的流计算平台主要包括以下几个部分: 1. **数据来源**:业务系统产生的日志数据通过DRC(Data Recovery ...

Global site tag (gtag.js) - Google Analytics