- 浏览: 105473 次
- 性别:
- 来自: 北京
最新评论
作者:张俊
整理:张友亮(Apache Flink 社区志愿者)
> 本文共 4745字,预计阅读时间 15min。
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大数据平台研发负责人张俊老师分享。主要内容如下:
* 网络流控的概念与背景
* TCP的流控机制
* Flink TCP-based 反压机制(before V1.5)
* Flink Credit-based 反压机制 (since V1.5)
* 总结与思考
网络流控的概念与背景
----------
### 为什么需要网络流控
![1_04](https://yqfile.alicdn.com/f55df72c2c254caf9a92fbe3890df48583dfae4d.png)
首先我们可以看下这张最精简的网络流控的图,Producer 的吞吐率是 2MB/s,Consumer 是 1MB/s,这个时候我们就会发现在网络通信的时候我们的 Producer 的速度是比 Consumer 要快的,有 1MB/s 的这样的速度差,假定我们两端都有一个 Buffer,Producer 端有一个发送用的 Send Buffer,Consumer 端有一个接收用的 Receive Buffer,在网络端的吞吐率是 2MB/s,过了 5s 后我们的 Receive Buffer 可能就撑不住了,这时候会面临两种情况:
* 1.如果 Receive Buffer 是有界的,这时候新到达的数据就只能被丢弃掉了。
* 2.如果 Receive Buffer 是无界的,Receive Buffer 会持续的扩张,最终会导致 Consumer 的内存耗尽。
### 网络流控的实现:静态限速
![2_05](https://yqfile.alicdn.com/52aca194fd13dc63d0ec1811e3535ab470a91e49.png)
为了解决这个问题,我们就需要网络流控来解决上下游速度差的问题,传统的做法可以在 Producer 端实现一个类似 Rate Limiter 这样的静态限流,Producer 的发送速率是 2MB/s,但是经过限流这一层后,往 Send Buffer 去传数据的时候就会降到 1MB/s 了,这样的话 Producer 端的发送速率跟 Consumer 端的处理速率就可以匹配起来了,就不会导致上述问题。但是这个解决方案有两点限制:
* 1、事先无法预估 Consumer 到底能承受多大的速率
* 2、 Consumer 的承受能力通常会动态地波动
### 网络流控的实现:动态反馈/自动反压
![3_06](https://yqfile.alicdn.com/5d3f0b587792ae53f5842bcf0e0253a7e52f0e4f.png)
针对静态限速的问题我们就演进到了动态反馈(自动反压)的机制,我们需要 Consumer 能够及时的给 Producer 做一个 feedback,即告知 Producer 能够承受的速率是多少。动态反馈分为两种:
* 1、负反馈:接受速率小于发送速率时发生,告知 Producer 降低发送速率
* 2、正反馈:发送速率小于接收速率时发生,告知 Producer 可以把发送速率提上来
让我们来看几个经典案例
### 案例一:Storm 反压实现
![4_07](https://yqfile.alicdn.com/7dad1c84b9210e1d3c99591c633b586438de9227.png)
上图就是 Storm 里实现的反压机制,可以看到 Storm 在每一个 Bolt 都会有一个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper 会一直被 Spout 监听,监听到有反压的情况就会停止发送,通过这样的方式匹配上下游的发送接收速率。
### 案例二:Spark Streaming 反压实现
![5_08](https://yqfile.alicdn.com/e7b4e415ccb4311ab95f455ce7f1c1b432206da1.png)
Spark Streaming 里也有做类似这样的 feedback 机制,上图 Fecher 会实时的从 Buffer、Processing 这样的节点收集一些指标然后通过 Controller 把速度接收的情况再反馈到 Receiver,实现速率的匹配。
### 疑问:为什么 Flink(before V1.5)里没有用类似的方式实现 feedback 机制?
首先在解决这个疑问之前我们需要先了解一下 Flink 的网络传输是一个什么样的架构。
![6_10](https://yqfile.alicdn.com/58006efbd154eebbf8110f7787b4d94bec451326.png)
这张图就体现了 Flink 在做网络传输的时候基本的数据的流向,发送端在发送网络数据前要经历自己内部的一个流程,会有一个自己的 Network Buffer,在底层用 Netty 去做通信,Netty 这一层又有属于自己的 ChannelOutbound Buffer,因为最终是要通过 Socket 做网络请求的发送,所以在 Socket 也有自己的 Send Buffer,同样在接收端也有对应的三级 Buffer。学过计算机网络的时候我们应该了解到,TCP 是自带流量控制的。实际上 Flink (before V1.5)就是通过 TCP 的流控机制来实现 feedback 的。
TCP 流控机制
--------
根据下图我们来简单的回顾一下 TCP 包的格式结构。首先,他有 Sequence number 这样一个机制给每个数据包做一个编号,还有 ACK number 这样一个机制来确保 TCP 的数据传输是可靠的,除此之外还有一个很重要的部分就是 Window Size,接收端在回复消息的时候会通过 Window Size 告诉发送端还可以发送多少数据。
![7_13](https://yqfile.alicdn.com/0b4bc72d3d02bb6f1c1b24ec08e0259602651fb3.png)
接下来我们来简单看一下这个过程。
### TCP 流控:滑动窗口
![8_14](https://yqfile.alicdn.com/7efa6c17cab155ed971fe0712d55ea5efb2465f5.png)
TCP 的流控就是基于滑动窗口的机制,现在我们有一个 Socket 的发送端和一个 Socket 的接收端,目前我们的发送端的速率是我们接收端的 3 倍,这样会发生什么样的一个情况呢?假定初始的时候我们发送的 window 大小是 3,然后我们接收端的 window 大小是固定的,就是接收端的 Buffer 大小为 5。
![9_15](https://yqfile.alicdn.com/f7e349d298e1e2c98a7f37f60023bd56922c6b08.png)
首先,发送端会一次性发 3 个 packets,将 1,2,3 发送给接收端,接收端接收到后会将这 3 个 packets 放到 Buffer 里去。
![10_16](https://yqfile.alicdn.com/df81be90f206d36610ac86784e3d069ba8443f59.png)
接收端一次消费 1 个 packet,这时候 1 就已经被消费了,然后我们看到接收端的滑动窗口会往前滑动一格,这时候 2,3 还在 Buffer 当中 而 4,5,6 是空出来的,所以接收端会给发送端发送 ACK = 4 ,代表发送端可以从 4 开始发送,同时会将 window 设置为 3 (Buffer 的大小 5 减去已经存下的 2 和 3),发送端接收到回应后也会将他的滑动窗口向前移动到 4,5,6。
![11_17](https://yqfile.alicdn.com/47bdef8db4a32262325a45ad0c46a20fec126887.png)
这时候发送端将 4,5,6 发送,接收端也能成功的接收到 Buffer 中去。
![12_18](https://yqfile.alicdn.com/58bf284147fecc3415ab75b935b927c093cb2b2b.png)
到这一阶段后,接收端就消费到 2 了,同样他的窗口也会向前滑动一个,这时候他的 Buffer 就只剩一个了,于是向发送端发送 ACK = 7、window = 1。发送端收到之后滑动窗口也向前移,但是这个时候就不能移动 3 格了,虽然发送端的速度允许发 3 个 packets 但是 window 传值已经告知只能接收一个,所以他的滑动窗口就只能往前移一格到 7 ,这样就达到了限流的效果,发送端的发送速度从 3 降到 1。
![13_19](https://yqfile.alicdn.com/c69da59e1f5ba7f1c3371018de0c42ad4456912d.png)
![14_20](https://yqfile.alicdn.com/a9a28aba06629691eaa7f577e3b2b5f9bc3b2eb0.png)
我们再看一下这种情况,这时候发送端将 7 发送后,接收端接收到,但是由于接收端的消费出现问题,一直没有从 Buffer 中去取,这时候接收端向发送端发送 ACK = 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。这个时候发送端不发送任何数据了,接收端也不进行任何的反馈了,那么如何知道消费端又开始消费了呢?
![15_21](https://yqfile.alicdn.com/101ebbd06f370649cc395438a96a98a8e54e137d.png)
![16_22](https://yqfile.alicdn.com/e636403b67a99c0b153fe0c2fe527904f271408b.png)
![17_23](https://yqfile.alicdn.com/50a7f6d0b4a9ff37262116ea716d666d8be96315.png)
TCP 当中有一个 ZeroWindowProbe 的机制,发送端会定期的发送 1 个字节的探测消息,这时候接收端就会把 window 的大小进行反馈。当接收端的消费恢复了之后,接收到探测消息就可以将 window 反馈给发送端端了从而恢复整个流程。TCP 就是通过这样一个滑动窗口的机制实现 feedback。
Flink TCP-based 反压机制(before V1.5)
---------------------------------
### 示例:WindowWordCount
![18_25](https://yqfile.alicdn.com/140ac4a6865e579875d4cca8aa9c5002d3eae2df.png)
大体的逻辑就是从 Socket 里去接收数据,每 5s 去进行一次 WordCount,将这个代码提交后就进入到了编译阶段。
### 编译阶段:生成 JobGraph
![19_26](https://yqfile.alicdn.com/4d05a32a46d7db50325b8ce00e0be672ec1ba9a7.png)
这时候还没有向集群去提交任务,在 Client 端会将 StreamGraph 生成 JobGraph,JobGraph 就是做为向集群提交的最基本的单元。在生成 JobGrap 的时候会做一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向集群进行提交,进入运行阶段。
### 运行阶段:调度 ExecutionGraph
![20_27](https://yqfile.alicdn.com/e916c83ad03e7e29ed5810974499adcce540524b.png)
JobGraph 提交到集群后会生成 ExecutionGraph ,这时候就已经具备基本的执行任务的雏形了,把每个任务拆解成了不同的 SubTask,上图 ExecutionGraph 中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 ExecutionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。然后我们概念化这样一张物理执行图,可以看到每个 Task 在接收数据时都会通过这样一个 InputGate 可以认为是负责接收数据的,再往前有这样一个 ResultPartition 负责发送数据,在 ResultPartition 又会去做分区跟下游的 Task 保持一致,就形成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。
### 问题拆解:反压传播两个阶段
![21_28](https://yqfile.alicdn.com/4a4a9e76dd9843819387b0e5ed25820029efde0e.png)
反压的传播实际上是分为两个阶段的,对应着上面的执行图,我们一共涉及 3 个 TaskManager,在每个 TaskManager 里面都有相应的 Task 在执行,还有负责接收数据的 InputGate,发送数据的 ResultPartition,这就是一个最基本的数据传输的通道。在这时候假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候是如何将这个压力反向传播回去呢?这时候就分为两种情况:
* 跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition
* TaskManager 内,反压如何从 ResultPartition 传播到 InputGate
### 跨 TaskManager 数据传输
![22_29](https://yqfile.alicdn.com/67ca07e2eaecd5190982c5ecc7e127960de0d83e.png)
前面提到,发送数据需要 ResultPartition,在每个 ResultPartition 里面会有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。
对于一个 TaskManager 来说会有一个统一的 Network BufferPool 被所有的 Task 共享,在初始化时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同步 Network BufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,因为 ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存于是将请求转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就可以被写入了。之后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将消息发送出去。然后接收端按照类似的机制去处理将消息消费掉。
接下来我们来模拟上下游处理速度不匹配的场景,发送端的速率为 2,接收端的速率为 1,看一下反压的过程是怎样的。
### 跨 TaskManager 反压过程
![23_30](https://yqfile.alicdn.com/72a01465aa1d9a5bf269d3c0efb69daeeda14f16.png)
因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他会向 Local BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的一个 Buffer 就会被标记为 Used。
![24_31](https://yqfile.alicdn.com/c579c55399d69082e7261a8ee6b8f36d570d8159.png)
发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向 Local BufferPool 申请 Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,当然每个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 Network BufferPool 耗尽。这时候看到 Network BufferPool 还是有可用的 Buffer 可以向其申请。
![25_32](https://yqfile.alicdn.com/58fcb2180a4308d4d19c63726917501ea0dbe691.png)
一段时间后,发现 Network BufferPool 没有可用的 Buffer,或是 Local BufferPool 的最大可用 Buffer 到了上限无法向 Network BufferPool 申请,没有办法去读取新的数据,这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer 中读取数据了。
![26_33](https://yqfile.alicdn.com/4a01bb1aabecd29f39d9199024590c42cb5ae3c1.png)
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的 TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。
![27_34](https://yqfile.alicdn.com/151369e71d92a1c82baa27b4396043c88fa90541.png)
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后就会停止向 Socket 写数据。
![28_35](https://yqfile.alicdn.com/6c57a0adb9d3ab71f8f4b7c5a2324f0e370d2bfe.png)
Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测 Netty 是否可写,发现不可写就会停止向 Netty 写数据。
![29_36](https://yqfile.alicdn.com/0de9eb334c31c720c472e5e24cc206c00d9c72e0.png)
这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool 和 Network BufferPool 申请内存。
![30_38](https://yqfile.alicdn.com/898b316178ec08b7c5b2736294337272046040f8.png)
Local BufferPool 和 Network BufferPool 都用尽后整个 Operator 就会停止写数据,达到跨 TaskManager 的反压。
### TaskManager 内反压过程
了解了跨 TaskManager 反压过程后再来看 TaskManager 内反压过程就更好理解了,下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是 Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。具体流程可以参考下图,这就是 TaskManager 内的反压过程。
![31_39](https://yqfile.alicdn.com/f6669644615c925f089e2fb6d3d20fabb2439a89.png)
![32_40](https://yqfile.alicdn.com/4463f83549c16ffa47902f91c44b1e47200061c4.png)
![33_41](https://yqfile.alicdn.com/a4816d7bdcfed08e89430ccd919943f1b8a341ea.png)
![34_42](https://yqfile.alicdn.com/0bc094c7fb61f41c3b2f66f02029c2d527ec89bd.png)
Flink Credit-based 反压机制(since V1.5)
-----------------------------------
### TCP-based 反压的弊端
![35_44](https://yqfile.alicdn.com/570debeba11e71ffac5f5b78f04e2bc88179e852.png)
在介绍 Credit-based 反压机制之前,先分析下 TCP 反压有哪些弊端。
* 在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。
* 依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。
### 引入 Credit-based 反压
这个机制简单的理解起来就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制。
### Credit-based 反压过程
![36_46](https://yqfile.alicdn.com/c61481b75ec8d5ae62af8ce4ea45bd9d171e9dce.png)
如图所示在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信),下面我们看一个具体示例。
![37_47](https://yqfile.alicdn.com/a9a211ea22afc8a07d55fc3feb168b69c37f3643.png)
假设我们上下游的速度不匹配,上游发送速率为 2,下游接收速率为 1,可以看到图上在 ResultSubPartition 中累积了两条消息,10 和 11, backlog 就为 2,这时就会将发送的数据 <8,9> 和 backlog = 2 一同发送给下游。下游收到了之后就会去计算是否有 2 个 Buffer 去接收,可以看到 InputChannel 中已经不足了这时就会从 Local BufferPool 和 Network BufferPool 申请,好在这个时候 Buffer 还是可以申请到的。
![38_48](https://yqfile.alicdn.com/be73146308876ade5e8b24e97413e0a7a71d3c8f.png)
过了一段时间后由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题。
总结与思考
-----
### 总结
* 网络流控是为了在上下游速度不匹配的情况下,防止下游出现过载
* 网络流控有静态限速和动态反压两种手段
* Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压
* Flink 1.5 之后实现了自己托管的 credit - based 流控机制,在应用层模拟 TCP 的流控机制
### 思考
有了动态反压,静态限速是不是完全没有作用了?
![39_52](https://yqfile.alicdn.com/fe6df2957ec9255f8b2958d9a78feb4719b4915a.png)
实际上动态反压不是万能的,我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端,这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流。所以说动态反压并不能完全替代静态限速的,需要根据合适的场景去选择处理方案。
[原文链接](https://yq.aliyun.com/articles/725982?utm_content=g_1000087745)
本文为云栖社区原创内容,未经允许不得转载。
整理:张友亮(Apache Flink 社区志愿者)
> 本文共 4745字,预计阅读时间 15min。
本文根据 Apache Flink 系列直播整理而成,由 Apache Flink Contributor、OPPO 大数据平台研发负责人张俊老师分享。主要内容如下:
* 网络流控的概念与背景
* TCP的流控机制
* Flink TCP-based 反压机制(before V1.5)
* Flink Credit-based 反压机制 (since V1.5)
* 总结与思考
网络流控的概念与背景
----------
### 为什么需要网络流控
![1_04](https://yqfile.alicdn.com/f55df72c2c254caf9a92fbe3890df48583dfae4d.png)
首先我们可以看下这张最精简的网络流控的图,Producer 的吞吐率是 2MB/s,Consumer 是 1MB/s,这个时候我们就会发现在网络通信的时候我们的 Producer 的速度是比 Consumer 要快的,有 1MB/s 的这样的速度差,假定我们两端都有一个 Buffer,Producer 端有一个发送用的 Send Buffer,Consumer 端有一个接收用的 Receive Buffer,在网络端的吞吐率是 2MB/s,过了 5s 后我们的 Receive Buffer 可能就撑不住了,这时候会面临两种情况:
* 1.如果 Receive Buffer 是有界的,这时候新到达的数据就只能被丢弃掉了。
* 2.如果 Receive Buffer 是无界的,Receive Buffer 会持续的扩张,最终会导致 Consumer 的内存耗尽。
### 网络流控的实现:静态限速
![2_05](https://yqfile.alicdn.com/52aca194fd13dc63d0ec1811e3535ab470a91e49.png)
为了解决这个问题,我们就需要网络流控来解决上下游速度差的问题,传统的做法可以在 Producer 端实现一个类似 Rate Limiter 这样的静态限流,Producer 的发送速率是 2MB/s,但是经过限流这一层后,往 Send Buffer 去传数据的时候就会降到 1MB/s 了,这样的话 Producer 端的发送速率跟 Consumer 端的处理速率就可以匹配起来了,就不会导致上述问题。但是这个解决方案有两点限制:
* 1、事先无法预估 Consumer 到底能承受多大的速率
* 2、 Consumer 的承受能力通常会动态地波动
### 网络流控的实现:动态反馈/自动反压
![3_06](https://yqfile.alicdn.com/5d3f0b587792ae53f5842bcf0e0253a7e52f0e4f.png)
针对静态限速的问题我们就演进到了动态反馈(自动反压)的机制,我们需要 Consumer 能够及时的给 Producer 做一个 feedback,即告知 Producer 能够承受的速率是多少。动态反馈分为两种:
* 1、负反馈:接受速率小于发送速率时发生,告知 Producer 降低发送速率
* 2、正反馈:发送速率小于接收速率时发生,告知 Producer 可以把发送速率提上来
让我们来看几个经典案例
### 案例一:Storm 反压实现
![4_07](https://yqfile.alicdn.com/7dad1c84b9210e1d3c99591c633b586438de9227.png)
上图就是 Storm 里实现的反压机制,可以看到 Storm 在每一个 Bolt 都会有一个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper 会一直被 Spout 监听,监听到有反压的情况就会停止发送,通过这样的方式匹配上下游的发送接收速率。
### 案例二:Spark Streaming 反压实现
![5_08](https://yqfile.alicdn.com/e7b4e415ccb4311ab95f455ce7f1c1b432206da1.png)
Spark Streaming 里也有做类似这样的 feedback 机制,上图 Fecher 会实时的从 Buffer、Processing 这样的节点收集一些指标然后通过 Controller 把速度接收的情况再反馈到 Receiver,实现速率的匹配。
### 疑问:为什么 Flink(before V1.5)里没有用类似的方式实现 feedback 机制?
首先在解决这个疑问之前我们需要先了解一下 Flink 的网络传输是一个什么样的架构。
![6_10](https://yqfile.alicdn.com/58006efbd154eebbf8110f7787b4d94bec451326.png)
这张图就体现了 Flink 在做网络传输的时候基本的数据的流向,发送端在发送网络数据前要经历自己内部的一个流程,会有一个自己的 Network Buffer,在底层用 Netty 去做通信,Netty 这一层又有属于自己的 ChannelOutbound Buffer,因为最终是要通过 Socket 做网络请求的发送,所以在 Socket 也有自己的 Send Buffer,同样在接收端也有对应的三级 Buffer。学过计算机网络的时候我们应该了解到,TCP 是自带流量控制的。实际上 Flink (before V1.5)就是通过 TCP 的流控机制来实现 feedback 的。
TCP 流控机制
--------
根据下图我们来简单的回顾一下 TCP 包的格式结构。首先,他有 Sequence number 这样一个机制给每个数据包做一个编号,还有 ACK number 这样一个机制来确保 TCP 的数据传输是可靠的,除此之外还有一个很重要的部分就是 Window Size,接收端在回复消息的时候会通过 Window Size 告诉发送端还可以发送多少数据。
![7_13](https://yqfile.alicdn.com/0b4bc72d3d02bb6f1c1b24ec08e0259602651fb3.png)
接下来我们来简单看一下这个过程。
### TCP 流控:滑动窗口
![8_14](https://yqfile.alicdn.com/7efa6c17cab155ed971fe0712d55ea5efb2465f5.png)
TCP 的流控就是基于滑动窗口的机制,现在我们有一个 Socket 的发送端和一个 Socket 的接收端,目前我们的发送端的速率是我们接收端的 3 倍,这样会发生什么样的一个情况呢?假定初始的时候我们发送的 window 大小是 3,然后我们接收端的 window 大小是固定的,就是接收端的 Buffer 大小为 5。
![9_15](https://yqfile.alicdn.com/f7e349d298e1e2c98a7f37f60023bd56922c6b08.png)
首先,发送端会一次性发 3 个 packets,将 1,2,3 发送给接收端,接收端接收到后会将这 3 个 packets 放到 Buffer 里去。
![10_16](https://yqfile.alicdn.com/df81be90f206d36610ac86784e3d069ba8443f59.png)
接收端一次消费 1 个 packet,这时候 1 就已经被消费了,然后我们看到接收端的滑动窗口会往前滑动一格,这时候 2,3 还在 Buffer 当中 而 4,5,6 是空出来的,所以接收端会给发送端发送 ACK = 4 ,代表发送端可以从 4 开始发送,同时会将 window 设置为 3 (Buffer 的大小 5 减去已经存下的 2 和 3),发送端接收到回应后也会将他的滑动窗口向前移动到 4,5,6。
![11_17](https://yqfile.alicdn.com/47bdef8db4a32262325a45ad0c46a20fec126887.png)
这时候发送端将 4,5,6 发送,接收端也能成功的接收到 Buffer 中去。
![12_18](https://yqfile.alicdn.com/58bf284147fecc3415ab75b935b927c093cb2b2b.png)
到这一阶段后,接收端就消费到 2 了,同样他的窗口也会向前滑动一个,这时候他的 Buffer 就只剩一个了,于是向发送端发送 ACK = 7、window = 1。发送端收到之后滑动窗口也向前移,但是这个时候就不能移动 3 格了,虽然发送端的速度允许发 3 个 packets 但是 window 传值已经告知只能接收一个,所以他的滑动窗口就只能往前移一格到 7 ,这样就达到了限流的效果,发送端的发送速度从 3 降到 1。
![13_19](https://yqfile.alicdn.com/c69da59e1f5ba7f1c3371018de0c42ad4456912d.png)
![14_20](https://yqfile.alicdn.com/a9a28aba06629691eaa7f577e3b2b5f9bc3b2eb0.png)
我们再看一下这种情况,这时候发送端将 7 发送后,接收端接收到,但是由于接收端的消费出现问题,一直没有从 Buffer 中去取,这时候接收端向发送端发送 ACK = 8、window = 0 ,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。这个时候发送端不发送任何数据了,接收端也不进行任何的反馈了,那么如何知道消费端又开始消费了呢?
![15_21](https://yqfile.alicdn.com/101ebbd06f370649cc395438a96a98a8e54e137d.png)
![16_22](https://yqfile.alicdn.com/e636403b67a99c0b153fe0c2fe527904f271408b.png)
![17_23](https://yqfile.alicdn.com/50a7f6d0b4a9ff37262116ea716d666d8be96315.png)
TCP 当中有一个 ZeroWindowProbe 的机制,发送端会定期的发送 1 个字节的探测消息,这时候接收端就会把 window 的大小进行反馈。当接收端的消费恢复了之后,接收到探测消息就可以将 window 反馈给发送端端了从而恢复整个流程。TCP 就是通过这样一个滑动窗口的机制实现 feedback。
Flink TCP-based 反压机制(before V1.5)
---------------------------------
### 示例:WindowWordCount
![18_25](https://yqfile.alicdn.com/140ac4a6865e579875d4cca8aa9c5002d3eae2df.png)
大体的逻辑就是从 Socket 里去接收数据,每 5s 去进行一次 WordCount,将这个代码提交后就进入到了编译阶段。
### 编译阶段:生成 JobGraph
![19_26](https://yqfile.alicdn.com/4d05a32a46d7db50325b8ce00e0be672ec1ba9a7.png)
这时候还没有向集群去提交任务,在 Client 端会将 StreamGraph 生成 JobGraph,JobGraph 就是做为向集群提交的最基本的单元。在生成 JobGrap 的时候会做一些优化,将一些没有 Shuffle 机制的节点进行合并。有了 JobGraph 后就会向集群进行提交,进入运行阶段。
### 运行阶段:调度 ExecutionGraph
![20_27](https://yqfile.alicdn.com/e916c83ad03e7e29ed5810974499adcce540524b.png)
JobGraph 提交到集群后会生成 ExecutionGraph ,这时候就已经具备基本的执行任务的雏形了,把每个任务拆解成了不同的 SubTask,上图 ExecutionGraph 中的 Intermediate Result Partition 就是用于发送数据的模块,最终会将 ExecutionGraph 交给 JobManager 的调度器,将整个 ExecutionGraph 调度起来。然后我们概念化这样一张物理执行图,可以看到每个 Task 在接收数据时都会通过这样一个 InputGate 可以认为是负责接收数据的,再往前有这样一个 ResultPartition 负责发送数据,在 ResultPartition 又会去做分区跟下游的 Task 保持一致,就形成了 ResultSubPartition 和 InputChannel 的对应关系。这就是从逻辑层上来看的网络传输的通道,基于这么一个概念我们可以将反压的问题进行拆解。
### 问题拆解:反压传播两个阶段
![21_28](https://yqfile.alicdn.com/4a4a9e76dd9843819387b0e5ed25820029efde0e.png)
反压的传播实际上是分为两个阶段的,对应着上面的执行图,我们一共涉及 3 个 TaskManager,在每个 TaskManager 里面都有相应的 Task 在执行,还有负责接收数据的 InputGate,发送数据的 ResultPartition,这就是一个最基本的数据传输的通道。在这时候假设最下游的 Task (Sink)出现了问题,处理速度降了下来这时候是如何将这个压力反向传播回去呢?这时候就分为两种情况:
* 跨 TaskManager ,反压如何从 InputGate 传播到 ResultPartition
* TaskManager 内,反压如何从 ResultPartition 传播到 InputGate
### 跨 TaskManager 数据传输
![22_29](https://yqfile.alicdn.com/67ca07e2eaecd5190982c5ecc7e127960de0d83e.png)
前面提到,发送数据需要 ResultPartition,在每个 ResultPartition 里面会有分区 ResultSubPartition,中间还会有一些关于内存管理的 Buffer。
对于一个 TaskManager 来说会有一个统一的 Network BufferPool 被所有的 Task 共享,在初始化时会从 Off-heap Memory 中申请内存,申请到内存的后续内存管理就是同步 Network BufferPool 来进行的,不需要依赖 JVM GC 的机制去释放。有了 Network BufferPool 之后可以为每一个 ResultSubPartition 创建 Local BufferPool 。
如上图左边的 TaskManager 的 Record Writer 写了 <1,2> 这个两个数据进来,因为 ResultSubPartition 初始化的时候为空,没有 Buffer 用来接收,就会向 Local BufferPool 申请内存,这时 Local BufferPool 也没有足够的内存于是将请求转到 Network BufferPool,最终将申请到的 Buffer 按原链路返还给 ResultSubPartition,<1,2> 这个两个数据就可以被写入了。之后会将 ResultSubPartition 的 Buffer 拷贝到 Netty 的 Buffer 当中最终拷贝到 Socket 的 Buffer 将消息发送出去。然后接收端按照类似的机制去处理将消息消费掉。
接下来我们来模拟上下游处理速度不匹配的场景,发送端的速率为 2,接收端的速率为 1,看一下反压的过程是怎样的。
### 跨 TaskManager 反压过程
![23_30](https://yqfile.alicdn.com/72a01465aa1d9a5bf269d3c0efb69daeeda14f16.png)
因为速度不匹配就会导致一段时间后 InputChannel 的 Buffer 被用尽,于是他会向 Local BufferPool 申请新的 Buffer ,这时候可以看到 Local BufferPool 中的一个 Buffer 就会被标记为 Used。
![24_31](https://yqfile.alicdn.com/c579c55399d69082e7261a8ee6b8f36d570d8159.png)
发送端还在持续以不匹配的速度发送数据,然后就会导致 InputChannel 向 Local BufferPool 申请 Buffer 的时候发现没有可用的 Buffer 了,这时候就只能向 Network BufferPool 去申请,当然每个 Local BufferPool 都有最大的可用的 Buffer,防止一个 Local BufferPool 把 Network BufferPool 耗尽。这时候看到 Network BufferPool 还是有可用的 Buffer 可以向其申请。
![25_32](https://yqfile.alicdn.com/58fcb2180a4308d4d19c63726917501ea0dbe691.png)
一段时间后,发现 Network BufferPool 没有可用的 Buffer,或是 Local BufferPool 的最大可用 Buffer 到了上限无法向 Network BufferPool 申请,没有办法去读取新的数据,这时 Netty AutoRead 就会被禁掉,Netty 就不会从 Socket 的 Buffer 中读取数据了。
![26_33](https://yqfile.alicdn.com/4a01bb1aabecd29f39d9199024590c42cb5ae3c1.png)
显然,再过不久 Socket 的 Buffer 也被用尽,这时就会将 Window = 0 发送给发送端(前文提到的 TCP 滑动窗口的机制)。这时发送端的 Socket 就会停止发送。
![27_34](https://yqfile.alicdn.com/151369e71d92a1c82baa27b4396043c88fa90541.png)
很快发送端的 Socket 的 Buffer 也被用尽,Netty 检测到 Socket 无法写了之后就会停止向 Socket 写数据。
![28_35](https://yqfile.alicdn.com/6c57a0adb9d3ab71f8f4b7c5a2324f0e370d2bfe.png)
Netty 停止写了之后,所有的数据就会阻塞在 Netty 的 Buffer 当中了,但是 Netty 的 Buffer 是无界的,可以通过 Netty 的水位机制中的 high watermark 控制他的上界。当超过了 high watermark,Netty 就会将其 channel 置为不可写,ResultSubPartition 在写之前都会检测 Netty 是否可写,发现不可写就会停止向 Netty 写数据。
![29_36](https://yqfile.alicdn.com/0de9eb334c31c720c472e5e24cc206c00d9c72e0.png)
这时候所有的压力都来到了 ResultSubPartition,和接收端一样他会不断的向 Local BufferPool 和 Network BufferPool 申请内存。
![30_38](https://yqfile.alicdn.com/898b316178ec08b7c5b2736294337272046040f8.png)
Local BufferPool 和 Network BufferPool 都用尽后整个 Operator 就会停止写数据,达到跨 TaskManager 的反压。
### TaskManager 内反压过程
了解了跨 TaskManager 反压过程后再来看 TaskManager 内反压过程就更好理解了,下游的 TaskManager 反压导致本 TaskManager 的 ResultSubPartition 无法继续写入数据,于是 Record Writer 的写也被阻塞住了,因为 Operator 需要有输入才能有计算后的输出,输入跟输出都是在同一线程执行, Record Writer 阻塞了,Record Reader 也停止从 InputChannel 读数据,这时上游的 TaskManager 还在不断地发送数据,最终将这个 TaskManager 的 Buffer 耗尽。具体流程可以参考下图,这就是 TaskManager 内的反压过程。
![31_39](https://yqfile.alicdn.com/f6669644615c925f089e2fb6d3d20fabb2439a89.png)
![32_40](https://yqfile.alicdn.com/4463f83549c16ffa47902f91c44b1e47200061c4.png)
![33_41](https://yqfile.alicdn.com/a4816d7bdcfed08e89430ccd919943f1b8a341ea.png)
![34_42](https://yqfile.alicdn.com/0bc094c7fb61f41c3b2f66f02029c2d527ec89bd.png)
Flink Credit-based 反压机制(since V1.5)
-----------------------------------
### TCP-based 反压的弊端
![35_44](https://yqfile.alicdn.com/570debeba11e71ffac5f5b78f04e2bc88179e852.png)
在介绍 Credit-based 反压机制之前,先分析下 TCP 反压有哪些弊端。
* 在一个 TaskManager 中可能要执行多个 Task,如果多个 Task 的数据最终都要传输到下游的同一个 TaskManager 就会复用同一个 Socket 进行传输,这个时候如果单个 Task 产生反压,就会导致复用的 Socket 阻塞,其余的 Task 也无法使用传输,checkpoint barrier 也无法发出导致下游执行 checkpoint 的延迟增大。
* 依赖最底层的 TCP 去做流控,会导致反压传播路径太长,导致生效的延迟比较大。
### 引入 Credit-based 反压
这个机制简单的理解起来就是在 Flink 层面实现类似 TCP 流控的反压机制来解决上述的弊端,Credit 可以类比为 TCP 的 Window 机制。
### Credit-based 反压过程
![36_46](https://yqfile.alicdn.com/c61481b75ec8d5ae62af8ce4ea45bd9d171e9dce.png)
如图所示在 Flink 层面实现反压机制,就是每一次 ResultSubPartition 向 InputChannel 发送消息的时候都会发送一个 backlog size 告诉下游准备发送多少消息,下游就会去计算有多少的 Buffer 去接收消息,算完之后如果有充足的 Buffer 就会返还给上游一个 Credit 告知他可以发送消息(图上两个 ResultSubPartition 和 InputChannel 之间是虚线是因为最终还是要通过 Netty 和 Socket 去通信),下面我们看一个具体示例。
![37_47](https://yqfile.alicdn.com/a9a211ea22afc8a07d55fc3feb168b69c37f3643.png)
假设我们上下游的速度不匹配,上游发送速率为 2,下游接收速率为 1,可以看到图上在 ResultSubPartition 中累积了两条消息,10 和 11, backlog 就为 2,这时就会将发送的数据 <8,9> 和 backlog = 2 一同发送给下游。下游收到了之后就会去计算是否有 2 个 Buffer 去接收,可以看到 InputChannel 中已经不足了这时就会从 Local BufferPool 和 Network BufferPool 申请,好在这个时候 Buffer 还是可以申请到的。
![38_48](https://yqfile.alicdn.com/be73146308876ade5e8b24e97413e0a7a71d3c8f.png)
过了一段时间后由于上游的发送速率要大于下游的接受速率,下游的 TaskManager 的 Buffer 已经到达了申请上限,这时候下游就会向上游返回 Credit = 0,ResultSubPartition 接收到之后就不会向 Netty 去传输数据,上游 TaskManager 的 Buffer 也很快耗尽,达到反压的效果,这样在 ResultSubPartition 层就能感知到反压,不用通过 Socket 和 Netty 一层层地向上反馈,降低了反压生效的延迟。同时也不会将 Socket 去阻塞,解决了由于一个 Task 反压导致 TaskManager 和 TaskManager 之间的 Socket 阻塞的问题。
总结与思考
-----
### 总结
* 网络流控是为了在上下游速度不匹配的情况下,防止下游出现过载
* 网络流控有静态限速和动态反压两种手段
* Flink 1.5 之前是基于 TCP 流控 + bounded buffer 实现反压
* Flink 1.5 之后实现了自己托管的 credit - based 流控机制,在应用层模拟 TCP 的流控机制
### 思考
有了动态反压,静态限速是不是完全没有作用了?
![39_52](https://yqfile.alicdn.com/fe6df2957ec9255f8b2958d9a78feb4719b4915a.png)
实际上动态反压不是万能的,我们流计算的结果最终是要输出到一个外部的存储(Storage),外部数据存储到 Sink 端的反压是不一定会触发的,这要取决于外部存储的实现,像 Kafka 这样是实现了限流限速的消息中间件可以通过协议将反压反馈给 Sink 端,但是像 ES 无法将反压进行传播反馈给 Sink 端,这种情况下为了防止外部存储在大的数据量下被打爆,我们就可以通过静态限速的方式在 Source 端去做限流。所以说动态反压并不能完全替代静态限速的,需要根据合适的场景去选择处理方案。
[原文链接](https://yq.aliyun.com/articles/725982?utm_content=g_1000087745)
本文为云栖社区原创内容,未经允许不得转载。
相关推荐
本文将深入剖析 Flink 的网络流控和反压机制,并探讨其在大数据处理中的重要性。 一、Flink 的网络流控机制 在 Flink 中,网络流控机制是指在数据流处理过程中对网络带宽的控制。该机制的主要目的是为了防止数据流...
Apache Flink 进阶(七):网络流控及反压剖析 88 Apache Flink 进阶(八):详解 Metrics 原理与实战 112 Apache Flink 进阶(九):Flink Connector 开发 125 Apache Flink 进阶(十):Flink State 最佳实践 141 ...
Flink网络流控和反压是Flink性能优化的另一个关键,开发人员可以使用Flink提供的网络流控和反压机制来控制数据流的速度和方向。 四、结论 Flink监控和优化是Flink流处理引擎的关键组件,开发人员可以使用Flink ...
本文将深入探讨Flink中的反压现象,以及如何进行模拟和分析。 首先,我们要理解Flink的网络栈。在Flink中,TaskManager负责执行任务,而不同TaskManager上的Subtask之间通过Channel进行数据传输。这些Channel基于...
Flink作为一个开源的分布式、高性能的流处理框架,其处理数据的速度可以达到毫秒级别,...总的来说,Flink的核心架构和执行流程是通过其源码来实现的,通过深入理解其源码,我们可以更加深入的理解Flink的逻辑和机制。
网络流控是流处理系统中的一个重要部分,用于处理和避免系统中的拥塞。Flink实现了基于反压的流控策略,当下游消费者的处理速度无法匹配上游生产者时,系统会减少生产者的发送速度以避免数据丢失。 ### Metrics原理...
2.课程从理论原理、环境配置、服务安装、组件集成开发、业务代码开发、可视化等项目完整流程讲解,不会跳讲和断讲 3.课程中无论案例代码开发还是项目业务代码开发,每一行代码都会边实现边讲解。 4.课程中目前使用...
另一方面,Apache Flink 是基于 Java 语言实现的,提供了 Java 和 Scala 语言的编程接口。Flink 的核心实现基于操作符的连续流模型。 计算模型 Spark 采用了微批处理模型,对数据进行小批量处理。这种模型可以提供...
2. **Exactly-once语义**:Flink提供了一种严格的一次性语义(Exactly-once),这意味着即使在系统故障后,也能确保数据处理的精确性和一致性。这一特性依赖于Flink的checkpoint机制,通过周期性地保存执行状态到...
在实际应用中,Flink的部署和配置是一个非常重要的环节,本节将详细介绍Flink的部署和配置。 Flink的部署模式主要有三种:Standalone集群、Hadoop YARN集群和Kubernetes集群。 1. Standalone集群: Standalone...
- **网络流控和反压**:理解如何控制数据传输速率,防止下游处理能力不足导致的反压问题。 - **Metrics原理与实战**:学习监控Flink作业性能和健康状态的方法。 - **Flink Connector开发**:如何自定义连接器以...
3. **状态管理**: Flink 提供了一套强大的状态管理机制,包括检查点和保存点,确保容错性和精确一次的状态一致性。 4. **窗口操作**: Flink 提供了滑动窗口、会话窗口、翻转窗口等多种窗口类型,以适应不同类型的流...
Flink有一个非常重要的特性,提供了很好的故障恢复能力,而这一次Flink又大大提升了更多的性能。Flink1.12版本的全新发布,揭开了又一次技术更新的浪潮。Flink高级案例课程经过短暂的入门初探,快速的进入到实战部署...
Flink、Storm、Spark Streaming三种流框架的对比分析 Flink架构及特性分析 Flink是一个原生的流处理系统,提供高级的API。Flink也提供API来像Spark一样进行批处理,但两者处理的基础是完全不同的。Flink把批处理...
本系统利用Apache Flink这一强大的流处理框架,构建了一个能够处理亿级数据的用户画像分析系统。本文将深入探讨该系统的架构设计、关键技术及其优势。 一、系统架构 1. 数据采集:用户行为数据通过各种渠道如APP、...
该项目是基于于flink的电商用户行为数据分析的flink项目,模块划为为:从项目介绍与代码框架、实时热门商品统计、替换kafka源、实时流量统计、恶意登录检测、恶意登录监控CEP实现、订单支付监控CEP实现
f-1.项目架构设计,f-2.springboot构建上报...f-44.flink分析之用户网络分析代码编写1,f-55.flink分析之flinkbatch实现产品成交分析代码编写2,f-65.flink分析之flinktable订单分析效果演示以及代码调试优化讲解补充1