作者 | Nico Krube
译者 | 王强
在之前的[文章](https://yq.aliyun.com/articles/706462?spm=a2c4e.11155435.0.0.6b5b3fa4Xo8uHh)中,我们从高级抽象到底层细节各个层面全面介绍了 Flink 网络栈的工作机制。作为这一系列的第二篇文章,本文将在第一篇的基础上更进一步,主要探讨如何监视与网络相关的指标,从而识别背压等因素带来的影响,或找出吞吐量和延迟的瓶颈所在。本文将简要介绍处理背压的手段,而之后的文章将进一步研究网络栈微调的话题。如果你不是很熟悉网络栈的知识,强烈建议先阅读本系列的第一篇文章 《[原理解析 | 深入了解 Apache Flink 的网络协议栈](https://yq.aliyun.com/articles/706462?spm=a2c4e.11155435.0.0.6b5b3fa4Xo8uHh)》。
监控
--
网络监控工作中最重要的环节可能就是监控背压了,所谓背压是指系统接收数据的速率高于其处理速度 \[1\]。这种现象将给发送者带来压力,而导致它的原因可能有两种情况:
* 接收器很慢。
这可能是因为接收器本身就遇到了背压,所以无法以与发送方相同的速率继续处理数据;也有可能是接收器因为垃圾回收工作、缺少系统资源或 I/O 瓶颈而暂时卡住了。
* 网络通道很慢。
这种情况可能和接收器没有(直接)关系,我们说这时是发送器遇到了背压,因为在同一台机器上运行的所有子任务共享的网络带宽可能供不应求了。请注意,除了 Flink 的网络栈之外可能还有其他网络用户,例如源(source)和汇(sink)、分布式文件系统(检查点、网络附加存储)、日志记录和指标监测等。我们之前的一篇关于容量规划的文章([https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines](https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.ververica.com%2Fblog%2Fhow-to-size-your-apache-flink-cluster-general-guidelines))介绍了更多相关内容。
\[1\] 如果你不熟悉背压,不了解它与 Flink 的交互方式,建议阅读我们在 2015 年发表的关于背压的文章([https://www.ververica.com/blog/how-flink-handles-backpressure](https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.ververica.com%2Fblog%2Fhow-flink-handles-backpressure))。
当背压出现时,它将一路向上游传导并最终到达你的源,还会减慢它们的速度。这本身并不是一件坏事,只是表明你缺乏足够的资源处理当前的负载。但你可能想要做一些改进,在不动用更多资源的前提下处理更高的负载。为此你需要找到(1)瓶颈在哪里(位于哪个任务 / 操作符)和(2)产生瓶颈的原因。Flink 提供了两种识别瓶颈的机制:
* 直接通过 Flink 的 Web UI 及其背压监视器识别
* 间接通过一些网络指标识别。
Flink 的 Web UI 大概是快速排除故障时的首选,但它存在一些缺点,我们将在下面解释。另一方面,Flink 的网络指标更适合持续监控和推断是哪些瓶颈导致了背压,并分析这些瓶颈的本质属性。我们将在下文中具体介绍这两个部分。在这两种情况下,你都需要从所有的源和汇中找出背压的根源。调查工作的起点一般来说是最后一个承受背压的操作符;而且最后这个操作符很可能就是背压产生的源头。
背压监视器
-----
背压监视器只暴露在 Flink 的 WebUI\[2\] 中。由于它是仅在请求时才会触发的活动组件,因此目前无法通过监控指标来提供给用户。背压监视器通过 Thread.getStackTrace() 对 TaskManager 上运行的所有任务线程采样,并计算缓存请求中阻塞任务的样本数。这些任务之所以会阻塞,要么是因为它们无法按照网络缓冲区生成的速率发送这些缓存,要么就是下游任务处理它们的速度很慢,无法保证发送的速率。背压监视器将显示阻塞请求与总请求的比率。由于某些背压被认为是正常 / 临时的,所以监视器将显示以下状态:
* OK,比率 ≤ 0.10
* LOW,0.10 < 比率 ≤ 0.5
* HIGH,0.5 < 比率 ≤ 1
虽说你也可以调整刷新间隔、样本数或样本之间的延迟等参数,但通常情况下这些参数用不着你来调整,因为默认值提供的结果已经够好了。
![1](https://yqfile.alicdn.com/1b02abba3547cec588a66af1fb4668a4b874fc60.jpeg)
\[2\] 你还可以通过 REST API 访问背压监视器:/jobs/:jobid/vertices/:vertexid/backpressure
背压监视器可以帮助你找到背压源自何处(位于哪个任务 / 操作符)。但你没法用它进一步推断背压产生的原因。此外,对于较大的作业或较高的并行度来说,背压监视器显示的信息就太乱了,很难分析,还可能要花些时间才能完整收集来自 TaskManager 的数据。另请注意,采样工作可能还会影响你当前作业的性能。
网络指标
----
网络指标和任务 I/O 指标比背压监视器更轻量一些,而且会针对当前运行的每个作业不断更新。我们可以利用这些指标获得更多信息,收集到的信息除了用来监测背压外还有其他用途。和用户关系最大的指标有:
* **Flink 1.8 及更早版本**:outPoolUsage、inPoolUsage。它们是对各个本地缓冲池中已用缓存与可用缓存的比率估计。在使用基于信用的流控制解析 Flink 1.5-1.8 中的 inPoolUsage 时,请注意它只与浮动缓存有关(独占缓存不算在缓冲池里)。
* **Flink 1.9 及更新版本**:outPoolUsage、inPoolUsage、floatingBuffersUsage、exclusiveBuffersUsage
它们是对各个本地缓冲池中已用缓存与可用缓存的比率估计。从 Flink 1.9 开始,inPoolUsage 是 floatingBuffersUsage 和 exclusiveBuffersUsage 的总和。
* numRecordsOut、numRecordsIn。这两个指标都带有两个作用域:一个是运算符,另一个是子任务。网络监视使用的是子任务作用域指标,并显示它已发送 / 接收的记录总数。你可能需要进一步研究这些数字来找出特定时间跨度内的记录数量,或使用等效的 PerSecond 指标。
* numBytesOut、numBytesInLocal、numBytesInRemote。表示这个子任务从本地 / 远程源发出或读取的字节总数。也可以通过 PerSecond 指标获取。
* numBuffersOut、numBuffersInLocal、numBuffersInRemote。与 numBytes 类似,但这里计算的是网络缓冲区的数量。
> 警告:为了完整起见,我们将简要介绍 outputQueueLength 和 inputQueueLength 这两个指标。它们有点像 \[out、in\] PoolUsage 指标,但这两个指标分别显示的是发送方子任务的输出队列和接收方子任务的输入队列中的缓存数量。但想要推断缓存的准确数量是很难的,而且本地通道也有一个很微妙的特殊问题:由于本地输入通道没有自己的队列(它直接使用输出队列),因此通道的这个值始终为 0(参见 FLINK-12576,[https://issues.apache.org/jira/browse/FLINK-12576](https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-12576));在只有本地输入通道的情况下 inputQueueLength = 0。
总的来说,我们不鼓励使用 outputQueueLength 和 inputQueueLength,因为它们的解析很大程度上取决于运算符当前的并行度以及独占缓存和浮动缓存的配置数量。相比之下,我们建议使用各种 \*PoolUsage 指标,它们会为用户提供更详尽的信息。
> 注意:如果你要推断缓存的使用率,请记住以下几点:
>
> 任何至少使用过一次的传出通道总是占用一个缓存(Flink 1.5 及更高版本)。
>
> Flink 1.8 及较早版本:这个缓存(即使是空的!)总是在 backlog 中计 1,因此接收器试图为它保留一个浮动缓存区。
>
> Flink 1.9 及以上版本:只有当一个缓存已准备好消费时才在 backlog 中计数,比如说它已满或已刷新时(请参阅 FLINK-11082)。
>
> 接收器仅在反序列化其中的最后一条记录后才释放接收的缓存。
后文会综合运用这些指标,以了解背压和资源的使用率 / 效率与吞吐量的关系。后面还会有一个独立的部分具体介绍与延迟相关的指标。
背压
--
有两组指标可以用来监测背压:它们分别是(本地)缓冲池使用率和输入 / 输出队列长度。这两种指标的粒度粗细各异,可惜都不够全面,怎样解读这些指标也有很多说法。由于队列长度指标解读起来有一些先天困难,我们将重点关注输入和输出池的使用率指标,该指标也提供了更多细节信息。
* **如果一项子任务的 outPoolUsage 为 100%,则它正在经受背压**。子任务是已经阻塞了,还是仍在将记录写入网络缓冲区,取决于 RecordWriter 当前正在写入的缓存有没有写满。这与背压监视器显示的结果是不一样的!
* 当 inPoolUsage 为 100%时表示所有浮动缓存都分配给了通道,背压最终将传递到上游。这些浮动缓存处于以下任一状态中:由于一个独占缓存正被占用(远程输入通道一直在尝试维护 #exclusive buffer 的信用),这些浮动缓存被保留下来供将来在通道上使用;它们为一个发送器的 backlog 保留下来等待数据;它们可能包含数据并在输入通道中排队;或者它们可能包含数据并正由接收器的子任务读取(一次一个记录)。
* **Flink 1.8 及更早的版本**:根据 FLINK-11082([https://issues.apache.org/jira/browse/FLINK-11082](https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-11082)),即使在正常情况下 100% 的 inPoolUsage 也很常见。
* **Flink 1.9 及以上版本**:如果 inPoolUsage 持续在 100%左右,这就是出现上游背压的强烈信号。
下表总结了所有组合及其解释。但请记住,背压可能是次要的的或临时的(也就是无需查看),或者只出现在特定通道上,或是由特定 TaskManager 上的其他 JVM 进程(例如 GC、同步、I/O、资源短缺等)引起的,源头不是某个子任务。
outPoolUsage low
outPoolUsage high
inPoolUsage low
正常
注意(产生背压,当前状态:上游暂未出现背压或已经解除背压)
inPoolUsage high (Flink 1.9+)
如果所有上游任务的 outPoolUsage 都很低,则只需要注意(可能最终会产生背压); 如果任何上游任务的 outPoolUsage 变高,则问题(可能在上游导致背压,还可能是背压的源头)
问题(下游任务或网络出现背压,可能会向上游传递)
我们甚至可以通过查看两个连续任务的子任务的网络指标来深入了解背压产生的原因:
* 如果接收器任务的所有子任务的 inPoolUsage 值都很低,并且有任一上游子任务的 outPoolUsage 较高,则可能是网络瓶颈导致了背压。由于网络是 TaskManager 的所有子任务共享的资源,因此瓶颈可能不是直接源自这个子任务,而是来自于各种并发操作,例如检查点、其他流、外部连接或同一台计算机上的其他 TaskManager/ 进程。
* 背压也可以由一个任务的所有并行实例或单个任务实例引起。
第一种情况通常是因为任务正在执行一些应用到所有输入分区的耗时操作;后者通常是某种偏差的结果,可能是数据偏斜或资源可用性 / 分配偏差。后文的“如何处理背压”一节中会介绍这两种情况下的应对措施。
Flink 1.9 及以上版本
---------------
* 如果 floatingBuffersUsage 没到 100%,那么就不太可能存在背压。如果它达到了 100% 且所有上游任务都在承受背压,说明这个输入正在单个、部分或全部输入通道上承受背压。你可以使用 exclusiveBuffersUsage 来区分这三种情况:
假设 floatingBuffersUsage 接近 100%,则 exclusiveBuffersUsage 越高,输入通道承受的背压越大。在 exclusiveBuffersUsage 接近 100%的极端情况下,所有通道都在承受背压。
* * 下表总结了 exclusiveBuffersUsage、floatingBuffersUsage 和上游任务的 outPoolUsage 之间的关系,还比上表多了一个 inPoolUsage = floatingBuffersUsage + exclusiveBuffersUsage:
exclusiveBuffersUsage low
exclusiveBuffersUsage high
floatingBuffersUsage low + 所有上游 outPoolUsage low
正常
\[3\]
floatingBuffersUsage low + 任一上游 outPoolUsage high
问题(可能是网络瓶颈)
\[3\]
floatingBuffersUsage high + 所有上游 outPoolUsage low
注意(最终只有一些输入通道出现背压)
注意(最终多数或全部输入通道出现背压)
floatingBuffersUsage high + 任一上游 outPoolUsage high
问题(只有一些输入通道在承受背压)
问题(多数或全部输入通道都在承受背压)
\[3\] 不应该出现这种情况
资源使用率 / 吞吐量
-----------
除了上面提到的各个指标的单独用法外,还有一些组合用法可以用来探究网络栈的深层状况:
* 吞吐量较低时 outPoolUsage 值却经常接近 100%,但同时所有接收器的 inPoolUsage 都很低,这表明我们的信用通知的往返时间(取决于你的网络延迟)太久,导致默认的独占缓存数量无法充分利用你的带宽。可以考虑增加每通道缓存参数或尝试禁用基于信用的流量控制。
* numRecordsOut 和 numBytesOut 这个组合可以用来确定序列化记录的平均大小,进而帮助你针对峰值场景做容量规划。
* 如果要了解缓存填充率和输出刷新器的影响,可以考察 numBytesInRemote 与 numBuffersInRemote 的组合。在调整吞吐量(而不是延迟!)时,较低的缓存填充率可能意味着网络效率较低。在这种情况下请考虑增加缓存超时时间。请注意,在 Flink 1.8 和 1.9 中,numBuffersOut 仅在缓存快填满或某事件停用某缓存(例如一个检查点屏障)时才会增加,这个动作还可能滞后。还请注意,由于缓存是针对远程信道的优化技术,对本地信道影响有限,因此不需要在本地信道上考察缓存填充率。
* 你还可以使用 numBytesInLocal 和 numBytesInRemote 的组合区分本地与远程流量,但在大多数情况下没这个必要。
如何处理背压?
-------
假设你确定了背压的来源,也就是瓶颈所在,下一步就是分析为什么会发生这种情况。下面我们按照从基本到复杂的顺序列出了导致背压的一些潜在成因。我们建议首先检查基本成因,然后再深入研究更复杂的成因,否则就可能得出一些错误的结论。
另外回想一下,背压可能是暂时的,可能是由于负载高峰、检查点或作业重启时数据 backlog 待处理导致的结果。如果背压是暂时的,那么忽略它就行了。此外还要记住,分析和解决问题的过程可能会受到瓶颈本身的影响。话虽如此,这里还是有几件事需要检查一下。
### 系统资源
首先,你应该检查受控机器的基本资源使用情况,如 CPU、网络或磁盘 I/O 等指标。如果某些资源在被全部或大量占用,你可以执行以下操作:
1. 尝试优化你的代码。此时代码分析器是很有用的。
2. 调整这项资源的 Flink。
3. 通过增加并行度和 / 或增加群集中的计算机数量来扩展资源。
### 垃圾收集
一般来说,长时间的垃圾回收工作会引发性能问题。你可以打印 GC 调试日志(通过 -XX: +PrintGCDetails)或使用某些内存 /GC 分析器来验证你是否处于这种状况下。由于 GC 问题的处理与应用程序高度相关,并且独立于 Flink,因此我们不会在此详细介绍(可参考 Oracle 的垃圾收集调整指南,[https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/index.html](https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fdocs.oracle.com%2Fjavase%2F8%2Fdocs%2Ftechnotes%2Fguides%2Fvm%2Fgctuning%2Findex.html) 或 Plumbr 的 Java 垃圾回收手册,[https://plumbr.io/java-garbage-collection-handbook](https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fplumbr.io%2Fjava-garbage-collection-handbook))。
### CPU/ 线程瓶颈
如果 CPU 瓶颈来自于一个或几个线程,而整台机器的 CPU 使用率仍然相对较低,则 CPU 瓶颈可能就很难被发现了。例如,48 核计算机上的单个 CPU 线程瓶颈只会带来 2%的 CPU 使用率。可以考虑使用代码分析器,因为它们可以显示每个线程的 CPU 使用情况,这样就能识别出热线程。
### 线程争用
与上面的 CPU/ 线程瓶颈问题类似,共享资源上较高的线程争用率可能会导致子任务瓶颈。还是要请出 CPU 分析器,考虑查找用户代码中的同步开销 / 锁争用——虽然我们应该避免在用户代码中添加同步性,这可能很危险!还可以考虑调查共享系统资源。例如,默认 JVM 的 SSL 实现可以从共享的 /dev/urandom 资源周围获取数据。
### 加载不均衡
如果你的瓶颈是由数据偏差引起的,可以尝试将数据分区更改为几个独立的重键,或实现本地 / 预聚合来清除偏差或减轻其影响。
除此之外还有很多情况。一般来说,为了削弱瓶颈从而减少背压,首先要分析它发生的位置,然后找出原因。最好从检查哪些资源处于充分利用状态开始入手。
延迟追踪
----
追踪各个可能环节出现的延迟是一个独立的话题。在本节中,我们将重点关注 Flink 网络栈中的记录的等待时间——包括系统网络连接的情况。在吞吐量较低时,这些延迟会直接受输出刷新器的缓存超时参数的影响,或间接受任何应用程序代码延迟的影响。处理记录的时间比预期的要长或者(多个)计时器同时触发——并阻止接收器处理传入的记录——时,网络栈内后续记录的等待时间会大大延长。我们强烈建议你将自己的指标添加到 Flink 作业中,以便更好地跟踪作业组件中的延迟,并更全面地了解延迟产生的原因。
Flink 为追踪通过系统(用户代码之外)的记录延迟提供了一些支持。但默认情况下此功能被禁用(原因参见下文!),必须用 metrics.latency.interval 或 ExecutionConfig #setLatencyTrackingInterval() 在 Flink 的配置中设置延迟追踪间隔才能启用此功能。启用后,Flink 将根据 metrics.latency.granularity 定义的粒度生成延迟直方图:
* single:每个操作符子任务有一个直方图
* operator(默认值):源任务和操作符子任务的每个组合有一个直方图
* subtask:源子任务和操作符子任务的每个组合有一个直方图(并行度翻了两番!)
这些指标通过特殊的“延迟标记”收集:每个源子任务将定期发出包含其创建时间戳的特殊记录。然后,延迟标记与正常记录一起流动,不会在线路上或缓存队列中超过正常记录。但是,延迟标记不会进入应用程序逻辑,并会在那里超过正常记录。因此,延迟标记仅测量用户代码之间的等待时间,而不是完整的“端到端”延迟。但用户代码会间接影响这些等待时间!
由于 LatencyMarker 就像普通记录一样位于网络缓冲区中,它们也会因缓存已满而等待,或因缓存超时而刷新。当信道处于高负载时,网络缓冲区数据不会增加延迟。但是只要一个信道处于低负载状态,记录和延迟标记就会承受最多 buffer\_timeout/2 的平均延迟。这个延迟会加到每个连接子任务的网络连接上,在分析子任务的延迟指标时应该考虑这一点。
只要查看每个子任务暴露的延迟追踪指标,例如在第 95 百分位,你就应该能识别出是哪些子任务在显著影响源到汇延迟,然后对其做针对性优化。
[原文链接](https://yq.aliyun.com/articles/724059?utm_content=g_1000086188)
本文为云栖社区原创内容,未经允许不得转载。
分享到:
相关推荐
- FLIP-6部署及处理模型演进:介绍Flink 1.5版本对部署模型和处理模型的主要变更。 通过学习这些知识点,读者可以对Flink有一个全面的了解,从基础架构到数据处理流程,再到容错机制和优化策略。对于希望深入研究...
Flink作为大数据领域的重要工具,它以其高效、低延迟和高容错性著称,尤其适用于处理海量的实时数据,如道路监控系统中的视频流和传感器数据。 大数据技术之Flink-修改版.doc可能是对Flink的基本概念、核心特性和...
3. **事件时间和处理延迟**:理解Flink中的事件时间处理机制,它是如何确保数据的正确处理顺序,以及如何通过水印(Watermark)来处理乱序事件。 4. **大数据在交通监控中的应用**:研究如何利用大数据技术分析交通...
Flink技术栈及其适用场景
Apache Flink 进阶(二):时间属性深度解析 18 Apache Flink 进阶(三):Checkpoint 原理剖析与应用实践 30 Apache Flink 进阶(四):Flink on Yarn/K8s 原理剖析及实践 41 Apache Flink 进阶(五):数据类型和...
flink 介绍,描述flink 场景使用及不足。flink 技术栈包含内容
7. **Flink的调度系统**:了解Flink的分布式执行模型,包括任务调度、资源管理和故障恢复策略。 8. **实时数据处理**:讨论实时数据处理的挑战,如延迟、数据倾斜以及如何在Flink中解决这些问题。 9. **案例研究**...
1. **DataStream API**:面向无界和有界数据流,支持事件时间和处理时间的概念,提供了窗口、状态和事件时间触发器等功能。 2. **DataSet API**:主要用于批处理,但其运算逻辑可被DataStream API重用,实现批流...
2. FLINK-1.12.2.jar:这是Flink的核心库文件,包含了Flink运行时所需的所有Java类和库,用于执行数据流处理任务。 3. manifest.json:在CDH中,manifest.json文件是parcel的元数据文件,包含了关于该软件包的信息,...
1. Flink监控:Flink提供了监控功能,可以实时监控任务执行情况和资源使用情况。 2. Flink日志:Flink提供了日志功能,可以记录任务执行情况和资源使用情况。 七、Flink与其他技术集成 1. Flink与HDFS集成:Flink...
6. **监控和管理**:通过Flink的Web UI或CDH的管理界面监控作业状态,进行作业管理和故障排查。 总结,Flink 1.13.1在CDH 6.3.2上的部署是一个集成了Parcel分发、YARN资源管理和Flink配置的过程。正确配置和使用...
1. **什么是Flink**:Flink是一个用于处理无界和有界数据的流处理框架,支持批处理和流处理两种模式,提供事件时间窗口和处理时间窗口的概念。 2. **Flink的核心组件**:DataStream API和DataSet API是Flink的两个...
【华为大数据认证:Flink流计算处理和批处理平台】 华为大数据认证的Flink部分主要聚焦于理解Flink的核心原理、关键特性和在FusionInsight HD平台中的集成情况。Flink是一个强大的计算框架,它将批处理和流处理集成...
Flink技术栈及其适用场景.pdf 这里介绍了flink组建的技术栈和使用场景,适合给使用Flink的同学进一步熟悉Flink
Table API 提供了一个关系型数据库样的接口,开发者可以使用 SQL 语句来查询和处理数据流。 4. Flink的应用场景:Flink可以应用于各种实时数据处理场景,如实时统计、实时监控、实时警报等。 5. Kafka 和 Flink 的...
Flink有一个非常重要的...课程内容包括了Flink安装部署,入门实战案例,Flink原理初探,流处理的教学,Flink高级API和Flink-Table-SQL-案例,Flink高级特性和新特性,Flink多语言开发,Flink监控与优化。 视频大小:4G
* 都是大数据处理框架,但 Flink 更加注重实时数据处理和流处理。 * Flink 能够更好地处理大规模数据,具有更高的性能和可扩展性。 Flink 的特点包括: * 高性能:Flink 能够实时处理大规模数据,具有高性能和低...
Apache Flink 是一个开源的流处理和批处理框架,它为实时数据处理提供了高效、可扩展和容错的解决方案。...通过深入理解这些概念和操作,开发者能够充分利用 Flink 构建高可用、高性能的数据处理应用。
总之,Flink-CDC是实现实时数据源监控和变更数据捕获的强大工具,其高效、稳定且具有广泛生态支持的特点,使得它在大数据实时处理领域具有广泛的应用前景。对于需要实现实时数据同步和ETL的企业来说,Flink-CDC是一...