【Flink】浅谈Flink背压问题(1)
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
概述
在多线程的情况下有一个典型的模型生产者消费者模型该模型主要由生产者、消费者和一个大小固定的队列组成。生产者向队列发送数据消费者从队列中取出数据并处理。
针对上述模型如果队列属于有限长度当消费者能力<生产者能力的时候就会出现数据堆积这样生产者的生产就会停止。现在将这个模型引入Flink算子链中生产者和消费者的身份是相对的一个生产者是上游的消费者一个消费者同样也是下游的生产者。所以一个节点模型中消费者的堵塞将会向上移动直到源头这就是反压。
Flink数据通信模型
假如一个Flink任务Job中有 TaskATaskB并发度都是 4即 A1-A4B1-B4。TaskA 与 TaskB 使用 keyby 连接。将这个 Flink Job 部署到 2 个 TM 中每个 TM 分配 2 个 slots。那么Flink会将 A 1 , A 2 , B 1 , B 2 A1,A2,B1,B2 A1,A2,B1,B2 放到一个 TM 中 A 3 , A 4 , B 3 , B 4 A3,A4,B3,B4 A3,A4,B3,B4 放到一个 TM 中具体示例如下图所示
同一个 TM 中的SubTask采用 【local 】方式进行数据传输。位于不同 TM 的 SubTask采用【remote】方式传输。传输示意图如下图所示
我们从上图可以看出以SubTaskA1为例其数据传输步骤如下
- TaskA1 先通过 【RecordWriter】对象将数据序列化写到一个 【Output Queue Buffer】 中下游的并行子任务个数就是队列的个数。
- 由【 Netty Service 】进行拉取满足以下任意一个条件都会进行拉取
- 【Output Queue Buffer】 写满了默认 32KB
- 【Output Queue Buffer】 超时了默认 100ms
- 遇到特殊结构例如 BarrierWaterMark
- 经过网络传输之后数据会写到 TaskB3 中的 【Input Queue】 中然后由 【RecordReader】对象将数据反序列化后进行处理。
也就是说一个 下游TM 中的并行子任务出现消费延迟就会阻塞 TCP-channel 进而影响整个 TM 的消费最终向上传递导致反压。
反压的监控
Web UI
可以直接在 Flink Web 中进行观察Flink检测会针对任何一个 Task 做反压检测。该机制需要在 Flink Web 上手动触发触发后TM 使用 Thread.GetStackTrace 来抽样检测 Task Thread 是否在 NetworkBuffer 中即是否处于等待状态。根据抽样比例来判断反压状态。Ratio 是代表抽样 n 次默认100次中遇到等待次数的比例。
- OKratio≤0.1
- LOW0.01≤Ratio≤0.5;
- High0.5≤Ratio≤1;
从 Sink→Source 进行检查第一个反压状态处于 High 的 task 大概率是反压的根源。
该方法有一定的缺陷
- 由于他是抽样无法观察到历史数据
- 影响作业流程
- 高并发场景下需要等待很久才能检测成功
Flink Network Metric
在上文提到过TM之间的通信都会使用到 InputQueue 和 OutputQueue我们可以通过使用【InputQueueUsage】 和 【OutputQueueUsage】这两个指标来判断出现反压的位置。
Task Status | OutputQueueUsage < 1.0 | OutputQueueUsage == 1.0 |
---|---|---|
InputQueueUsage < 1.0 | 正常 | 处于反压其根本原因可能是该 Task 下游处理能力不足导致持续下去该 Task 将会向上游传递反压 |
InputQueueUsage == 1.0 | 处于反压持续下去该 Task 会向上游传递反压而且该 Task 可能是反压的源头 | 处于反压原因可能是被下游阻塞 |
现在看一个实际的例子
从指标监控界面可以看出 FlatMap→Reduce 出现了阻塞再看 reduce 任务的 inpoolusage 和 outpoolusage 指标得出结论reduce任务就是反压的源头。
往期回顾
我将在下一期详细介绍反压形成的原因以及处理办法敬请期待