PyFlink 最新进展解读及典型应用场景介绍

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

摘要本文整理自阿里巴巴高级技术专家付典在 FFA 核心技术专场的分享。本篇内容主要分为四个部分

  1. PyFlink 发展现状介绍

  2. PyFlink 最新功能解读

  3. PyFlink 典型应用场景介绍

  4. PyFlink 下一步的发展规划

Tips点击「阅读原文」查看原文视频&演讲 ppt

01

PyFlink 发展现状介绍


394cade90b58e599ae53b50bba07c79f.jpeg

很多 PyFlink 的新用户都会问这样一些问题PyFlink 是否成熟功能是否齐全性能怎么样在这里我们针对用户的这样一些问题进行一个详细的解读。

首先在功能层面PyFlink 已经对齐了 Flink Java API 中的绝大多数功能。用户使用 Java API 可以实现的功能基本上都可以用 Python API 实现得出来。

同时PyFlink 还面向 Python 用户提供了很多特有的能力比如说 Python UDF、Pandas UDF 等允许用户在 PyFlink 作业中使用各种 Python 三方库。

在部署模式上PyFlink 支持各种常见的部署模式比如说 YARN、kubernetes、Standalone 等这意味着用户可以根据需要灵活地选择作业的部署模式。

除了功能层面之外性能也是很多用户非常关心的。在性能上PyFlink 也做了很多优化首先在执行计划层面PyFlink 做了一系列的优化尽可能优化作业的物理执行计划比如算子融合。

当作业的物理执行计划确定之后在 Python 运行时PyFlink 通过 Cython 实现了 Python 运行时中核心链路的代码尽量降低 Python 运行时中框架部分的开销。对于 Cython 有所了解的同学应该知道Cython 在执行时会被编译成 native 代码来执行性能非常高。

同时PyFlink 还在现有的进程模式的基础之上引入了线程执行模式以进一步提升 Python 运行时的性能。线程执行模式在 JVM 中执行用户的 Python 代码通过这种方式在一些典型应用场景中性能甚至可以追平 Java这一块后面我们还会详细介绍。

经过这一系列的优化之后目前 PyFlink 无论在功能上还是在性能上都已经基本完备达到了生产可用的状态。

9851de065f793693dd1e1e65ac15de83.jpeg

PyFlink 达到目前这样一个状态并不是一蹴而就的从 Flink 1.9 开始引入 PyFlink到目前为止PyFlink 已经累计发布了 8 个大版本20 多个小版本。

从 Flink 1.9 到 Flink 1.11 这几个版本中我们重点在完善 Python Table API基本上对齐了 Java Table API 中的绝大多数功能同时也支持了 Python UDF、Pandas UDF 等功能。

在 Flink 1.12 至 Flink 1.14 版本中社区主要是在完善 Python DataStream API目前已经基本上对齐了 Python DataStream API 上的绝大部分常用功能。

在 Flink 1.15 至 Flink 1.16 版本中PyFlink 的重点是在性能优化上在原有的进程执行模式的基础之上为 Python 运行时引入了线程执行模式以进一步地提升 Python 运行时的性能。

随着 PyFlink 功能的逐渐完善我们也看到 PyFlink 的用户数也在逐渐增长PyPI 的日均下载量在过去一年也有了显著的增长从最开始的日均 400 多次已经增长到日均 2000 多次。

02

PyFlink 最新功能解读

9c9bc50b8b7f806f12b5bff40dcbba8e.jpeg

接下来我们看一下 PyFlink 在 Flink 1.16 中的功能。PyFlink 在 Flink 1.16 中支持的功能主要是围绕使 PyFlink 在功能及性能上全面生产可用这样一个目的。为此我们重点补齐了 PyFlink 在功能以及性能上的最后几处短板。

020a03b25db182fcc0193c2c3208a16c.jpeg

如上图所示PyFlink 在 Flink 1.16 中支持了 side output 功能。用户可以把一条数据流切分成多条数据流。以机器学习为例用户可以通过该功能把一份数据集给切分成多份分别用于模型训练和模型验证。

除此之外用户也可以通过 side output 处理迟到数据或者脏数据。将迟到数据或者脏数据通过 side output 拆分出来单独进行处理。用户也可以通过 side output 把迟到数据或脏数据写入外部存储进行离线分析。

37d4154e69f9648d965f110773f9c3e0.jpeg

PyFlink 在 Flink 1.16 中还支持了 broadcast state。通过该功能用户可以将一条数据流中的数据广播发送到另一条数据流算子中的多个并发实例上并通过 broadcast state 保存广播流的状态以确保作业在 failover 时所有算子恢复的状态是一致的。

比如我们用 PyFlink 做近线预测当模型更新后可以将最新的模型文件地址广播发送到所有的预测算子来实现模型的热更新并通过 broadcast state 确保在作业 failover 时所有算子加载的模型文件是一致的。

103bd3feab7c5389623274b379e88ad2.jpeg

PyFlink 在 Flink 1.16 中对于 DataStream API 上 Window 的支持也做了很多完善原生支持了各种窗口比如滚动窗口、滑动窗口、会话窗口等等。Window 可以将无限流中的数据划分成不同的时间窗口进行计算在流计算中是非常重要的功能有着非常丰富的应用场景。

比如机器学习用户可以使用 Window 来计算实时特征。在短视频应用中可以通过 Window计算用户最近五分钟的有效视频观看列表也可以通过 Window来计算最近 30 分钟某个视频在各个人群中的点击分布等。

372bb5dd0ccdb451455c9bd73494f87a.jpeg

除此之外在 Flink 1.16 中PyFlink 还新增了对于很多 DataStream API 上 connector 的支持包括 Elasticsearch、Kinesis、Pulsar、Hybrid source 等。与此同时也支持 Orc、Parquet 等 format。

有了这些 connector 以及 format 的支持PyFlink 基本上已经对齐了所有 Table API 以及 DataStream API 上 Flink 官方所支持的 connector。

需要说明的是对于 PyFlink 中没有原生提供支持的 connector如果有对应的 Java 实现也是可以在 PyFlink 作业中使用的其中 Table API 以及 SQL 上的 connector可以直接在 PyFlink 作业中使用不需要任何开发。

对于 DataStream API connector用户只需要非常少量的开发即可在 PyFlink 作业中使用。如果用户有需求的话可以参考一下 PyFlink 中现有的 DataStream API connector 是如何支持的基本上只需要一两个小时即可完成一个 connector 的支持。

95d34fcdee6e6d12ef07f8ad978a345c.jpeg

除了前面介绍的这些功能层面的增强之外在性能层面Flink 1.16 也做了很多工作基本完成了 Python 运行时线程执行模式的支持。相比于进程执行模式线程执行模式的性能更好。

线程执行模式通过 JNI 调用的方式执行 Python 代码节省序列化/反序列化开销及通信开销。特别是当单条数据比较大时效果更加明显。

由于不涉及跨进程通信线程执行模式目前采用同步执行的方式不需要在算子中进行攒批操作没有攒批延迟适用于对延迟敏感的场景比如量化交易。

与此同时与其他 Java/Python 互调用方案相比PyFlink 所采用的方案兼容性更好。很多 Java/Python 互调用方案对于所能支持的 Python 库都有一定程度的限制。PyFlink 所采用的方案对于用户在作业中所使用的 Python 库没有任何限制。

4b79b6b6c765afdbfc069356868e5625.jpeg

如上图左侧所示展示了进程模式和线程模式架构的区别。进程模式需要启动一个独立的 Python 进程用于执行用户的 Python 代码。线程执行模式在 JVM 中通过 JNI 调用的方式执行 Python 代码。PEMJA 是 PyFlink 中 Java 代码和 Python 代码之间互调用的库。

如上图右侧所示在处理时延上相比于进程模式线程模式有显著的降低。因为进程模式不需要攒数据来一条处理一条。与此同时在处理性能上线程模式相比进程模式也有较大的提升在某些情况下性能甚至可以追平 Java。

这里需要说明的是Python UDF 的执行性能既取决于 PyFlink 执行框架的性能也跟 Python UDF 的实现是否高效息息相关。通过各种优化手段目前 PyFlink 执行框架的性能已经非常高效开销非常小。用户的 PyFlink 作业的执行性能很大程度取决于用户作业中的 Python UDF 实现得是否高效。

如果用户的 Python UDF 实现得足够高效比如说实现的过程中针对一些耗时操作有针对性地进行来一些优化或者利用一些高性能的 Python 三方库那么 PyFlink 作业的性能其实是可以实现的非常好的。

03

PyFlink 典型应用场景介绍

44e4c78938407faa9674437bd926c629.jpeg

接下来讲一讲 PyFlink 的应用场景。目前实时机器学习是 PyFlink 用户的重点应用场景。以推荐系统为例上图是实时推荐系统的一个典型架构。用户的行为日志通过 APP 埋点等手段实时采集到消息队列中经过实时数据清洗归一化处理之后在特征生成、样本拼接等模块使用。实时的用户行为日志可以用来计算实时特征。

fe667b8762b9aff0d079943b25ccc9da.jpeg

首先实时用户行为日志可以被用来计算实时特征。实时特征是 Flink 非常重要的应用场景。实时特征对于推荐效果的提升非常明显建设难度相对来说比较小是当前很多公司投入的重点。

比如在短视频应用中用户最近 N 分钟的有效视频观看列表就是短视频应用中非常重要的用户实时特征。这个特征可以通过一个 Flink 作业实时分析用户的行为日志得到。

一般来说用户的行为日志还会同步一份到离线存储中用于生成离线特征。这块主要是用于计算一些复杂特征或者是说长周期特征。不管是离线特征还是实时特征最终都会存储到特征库中供在线推荐系统使用。

238affa55d71c9507a05cd996416368b.jpeg

实时的用户行为日志不但可以用来构造实时特征而且可以用来构造实时样本用于模型训练。

通过分析用户的行为日志可以自动完成对样本打标签。比如在推荐系统中给用户推荐了 10 个 item如果用户点击了某个 item那么在行为日志中就会出现这个 item 的点击事件。有了这个点击事件我们就可以得到一条正样本。同理如果对于某个 item只有曝光事件没有点击事件我们就可以将其看成是一条负样本。

除了区分正负样本之外还需要拼接上用户的特征以及 item 的特征之后才能得到一条完整的样本。这里需要注意的是做样本拼接时所用的特征不是来自于实时特征库而是来自于历史特征库。

由于实时特征库中的特征是不断更新的比如在短视频应用中用户最近 N 分钟的视频点击列表特征随着时间的推移在不断发生变化。因此在样本拼接时我们希望拼接推荐发生时所用到的特征而不是当前时刻的特征。样本拼接可能发生在推荐事件过去一段时间之后此时在实时特征库中存储的特征可能已经发生了变化因此这里拼接的是历史特征库。因为历史特征库中的数据来自于推荐发生时所用到的特征。

c7254e33f4eb75c597fee05eb826949c.jpeg

样本经过训练之后最终生成模型。经过验证如果没有问题就可以把模型部署到线上供在线推理服务使用。

在推荐系统中在线推理服务包括召回、排序等多个环节。其中在召回环节使用比较广泛的一种手段是多路召回技术每一路召回使用不同的策略。比如说可以根据用户画像、当前的热点内容、运营策略等分别生成不同的召回结果。对于这些召回结果合并之后再经过排序等环节之后展示给用户。

由此可见多路召回的好处是显而易见的。通过多路召回可以增强推荐结果的多样性。这里需要指出的是由于推荐系统对于延时比较敏感对于召回策略或者模型的性能要求非常高。

因此召回中使用的模型或者策略一般都比较简单。目前一些公司也在探索在多路召回系统中引入近线召回。近线召回可以预先计算召回结果并将召回结果缓存作为多路召回中的一路供在线推理服务直接使用。因此近线召回没有时延约束用户可以在近线召回中使用一些比较复杂的模型或者策略。

9b4dfdba9d6d29040277a49070a5fcea.jpeg

接下来介绍一下在上述步骤中如何使用 PyFlink 完成各项功能的开发。在实时数据清洗部分机器学习应用中输入数据中往往包含很多列。

b5e7a997340c553cf33e9625e898a08c.jpeg

Flink 的其他功能也可以用于数据清洗比如 SQL。SQL 本身也是非常方便的那么和 SQL 相比PyFlink 可以提供哪些附加价值呢

首先在机器学习场景中普遍有一个共性的特点数据中的列非常多可能有几十列甚至上百列。在这种情况下 SQL 语句可能写起来非常长比如在这个例子中用户可能希望对第 9 列和第 10 列进行一个合并操作其他列保留。

但是在 SELECT 语句中需要把所有的其他的无关列都写出来如果数据中的列非常多写起来非常繁琐同时可读性、可维护性也会变得比较差PyFlink 对于这块提供了完善的支持。

另外机器学习用户通常对于 Pandas 比较熟悉习惯于使用 Pandas 进行数据处理很多机器学习相关的库的数据结构都是采用 Pandas 或者 Numpy 的数据结构PyFlink 在这块也提供了很好的支持支持用户在 Python UDF 的实现中使用 Pandas 库。

接下来我们通过几个具体的例子看一下如何在 PyFlink 中使用上述功能。

c2696f561c56bb039058a854b223786d.jpeg

首先PyFlink 提供了行操作和列操作的 API从而简化用户的代码逻辑。通过列操作用户可以非常方便的增加列、删除列或替换列。列操作适用于输入数据的列很多且只有个别列发生变化的场景。

比如在上述例子中我们通过 add_or_replace_columns 操作对数据中的 item_id 一列进行归一化后替换原有的 item_id 列数据中的其他列不需要再显式列出来。

除了列操作之外PyFlink 还支持行操作可以以行为单位对数据进行变换。在行操作的 UDF 中可以直接通过列名引用对应列使用起来非常方便适用于输入数据中的列很多且需要对多个列进行处理的场景。

在行操作中不需要在 UDF 的输入参数中把所有用到的列都显式列出来而是把一行数据都作为输入传进来供 UDF 使用。

在上述例子中我们通过 map 操作对数据进行变换map 的输入是一个 Python 函数Python 函数的输入输出类型都是 Row 类型Row 是 PyFlink 中定义的一个数据结构在 Python 函数的实现中可以通过列名引用输入数据中对应列的值使用起来非常方便。

除了 map 之外PyFlink 中还提供了多个行操作相关的 API如果有需要的话大家可以从 PyFlink 的官方文档中了解详细信息。

a083331bcf0997958626c1581fbbde24.jpeg

如果用户熟悉 Pandas 库也可以在 Python 函数中使用 Pandas 库。用户只需要将 Python 函数的类型标记成 Pandas 即可。

在这种情况下Python 函数的输入类型是 Pandas 的 DataFrame。PyFlink 运行时框架会在调用用户的 Python 函数之前将输入数据转换成 Pandas 的 DataFrame 结构方便用户使用。除此之外Python 函数的输出类型也需要是 Pandas DataFrame。

2d84d3dcab85901bbd19f79742f2127d.jpeg

接下来我们看一下实时特征计算部分。

6cae37a84655bc724053c21365f76922.jpeg

当前有很多公司开发一个实时特征任务的流程通常是这样的

首先算法团队的同学通过数据挖掘等手段发现某个特征会比较有用。然后找到数据开发团队的同学进行需求沟通。将特征的详细描述信息甚至 Python 代码参考实现提给数据开发团队的同学。

然后数据开发团队的同学进行需求排期并实现。在实现的过程中算法团队的同学和数据开发团队的同学可能还需要进行多轮沟通确保数据开发团队的同学的理解和实现没有问题。

另外算法团队同学提供的 Python 参考实现有可能不太容易翻译成 Java 代码。比如里面用到了一些 Python 三方库找不到合适的 Java 实现等等。

最后当特征任务开发好之后算法同学经过一系列的验证很有可能发现这个特征可能并没有预期中的效果那么好这样的一个特征任务很可能就废弃了。数据开发团队的同学也白忙活了。

从这个过程中我们看到特征的开发成本是非常高的涉及到跨团队的沟通、开发语言的转换特征上线的周期也非常长通常以周甚至月为单位。

而 PyFlink 可以显著降低实时特征任务的开发门槛、缩短实时特征的上线周期。有了 PyFlink算法团队的同学完全可以自己来开发实时特征任务。同时在特征任务开发的过程中可以使用各种 Python 库没有任何限制。

19766a268ebb5849b6ed1299407544dc.jpeg

在推荐场景中可能会用到这样一个特征计算用户最近 5 分钟的访问物品列表。

为此PyFlink 提供了多种实现手段。

da4cd8b9f003eaf5fdebfef7e8b914e1.jpeg

首先用户可以通过 SQL+Pandas UDAF 的方式来实现上述功能。

上述 SQL 语句定义了一个长度为 5 分钟、步长为 30 秒的滑动窗口针对窗口中的数据定义了一个 Pandas UDAF 来计算用户在这个窗口中的访问序列。Pandas UDAF 的主要逻辑是对窗口中用户访问的 item 进行排序并使用|作为分隔符生成访问序列字符串。

4b8a4000960e86dafb6a3af0611280ed.jpeg

除此之外用户可以通过 DataStream API 计算序列特征实现上述功能。通过使用 DataStream API定义了一个窗口大小为 5 分钟、步长为 30 秒的滑动窗口并定义了一个聚合函数来处理每一个窗口中的数据。聚合函数需要实现 create_accumulator、add、get_result、merge定义如何针对窗口中的数据进行聚合运算。

针对窗口中的每一条数据框架会依次调用聚合函数的 add 方法当窗口中所有的数据都处理完后框架会调用聚合函数的 get_result 方法来获得聚合值因此用户只需要根据业务逻辑的需要实现这几个方法即可。

在 add 方法中我们将数据缓存起来在 get_result 中对于所有数据进行排序并以|作为分隔符生成访问序列字符串。

6a7b98f0effa2ba931371ad72bf7d5ea.jpeg

接下来我们来看一下实时样本生成部分。在实时样本生成部分主要有正负样本判断和特征拼接。

eb2e2b7fd4164f0c0602e0b6318afe5f.jpeg

首先我们看一下正负样本的构造。在推荐场景中当给用户推荐了一批 item 之后如果用户点击了某个 item就会成为一个正样本而如果用户没有点击则成为一个负样本。

在离线场景中判别正负样本是非常容易的而在实时场景中就不那么容易了。给用户展现了某个 item 之后用户有可能不点击也有可能隔了很久之后才点击。

在 Flink 中用户可以通过定时器解决正负样本问题。针对每条曝光事件用户可以注册一个定时器。定时器的时间间隔可以根据业务的需要确定。

在这个例子中我们定义了一个 10 分钟的定时器。在 10 分钟内如果收到了这条曝光事件对应的点击事件则可以将其看成是一个正样本否则如果在 10 分钟内还没有收到对应的点击事件则可以将其看成一个负样本。

4cc678f08fa6980a4a8bba66395c6c74.jpeg

正负样本的问题解决了之后样本的标签也就确定了接下来还需要拼接上推荐发生时所用到的特征才能成为一条完整的样本。

这里我们可以使用 Flink 中的维表 Join 功能来进行特征的拼接。为了解决特征穿越问题也就是说在拼接用户的实时特征时用户的实时特征相比推荐发生时可能已经发生了变化前面我们提到在线推理服务在推荐时可以将所用到的实时特征保存到历史特征库中。为了便于区分特征的版本可以给特征加一个唯一的标识比如 trace_id然后在做特征拼接时通过 trace_id 来定位推荐发生时所使用的特征解决特征穿越的问题。

19b07739e2f6612985c3c324257dfc63.jpeg

接下来我们来看一下近线推理。近线推理是非常典型的应用场景。目前很多用户在用 PyFlink 做近线推理。

a52087b686a1577fd2260956d8d86579.jpeg

首先用户可以通过 Table API 做近线推理。在 Table API 里用户可以通过 Select 语句做近线推理。推理逻辑可以封装在用户的自定义函数中。在自定义函数里用户可以通过 open 方法加载机器学习模型。

open 方法只会在作业启动阶段调用一次因此可以确保机器学习模型只 load 一次。实际的预测逻辑可以定义在 eval 方法中。

4e106fbec36e299589c9efd0c8b91af6.jpeg

除了 Table API 之外用户也可以通过 DataStream API 做近线推理。跟 Table API 类似DataStream API 中的自定义函数中也提供了一个 open 方法。用户可以在 open 方法里加载机器学习模型。使用方式跟 Table API 比较像用户可以根据自己的需求选择使用 Table API还是 DataStream API。

4474b0ced526794ed65962155fd4561d.jpeg

除此之外用户可以通过 timer 提升推理的时效性。在某些场景中为了提高时效性可以通过定时器来做周期性推理。该方法适用于活跃用户的范围比较确定且用户访问比较频繁的场景。

在这些场景中可以针对活跃用户或者圈选一批重点用户周期性地进行近线推理以进一步提升推荐效果。在这里我们每 5 分钟对于活跃用户进行一次近线推理。

bc9b040ba06231e37160fe649e285c67.jpeg

某些公司可能是 Java 技术栈。算法团队训练出模型之后由开发团队再去负责部署使用。在这种情况下用户可能会倾向于使用 Flink 的 Java API 进行预测。

在 PyFlink 支持线程模式的过程中抽象出了一个 library PEMJA支持 Java 和 Python 之间的互调用跟其他的 Java/Python 互调用库相比PEMJA 的性能更好并且对于各种 Python 库的支持也比较好兼容所有的 Python 库。

这个例子展示了如何利用 PEMJA 提供的 API在 Flink Java 作业中加载机器学习模型、并进行预测。从这个例子可以看出通过 PEMJA用户可以在 JAVA 代码中调用并执行 Python 代码。

04

PyFlink 下一步发展规划

a1bbb4328dc2eb482ccebc6be5a78a20.jpeg

接下来PyFlink 的建设重点会逐步从功能以及性能转向易用性、稳定性以及文档帮助用户更好的使用 PyFlink。我们接下来会重点完善以下几个方面。

首先由于当前 PyFlink 的端到端示例相对来说还比较少不利于新用户快速上手。接下来我们会建设一个独立的 PyFlink 网站结合具体场景展示更多的端到端使用示例。

其次在易用性方面接下来会重点优化作业执行过程中的报错提示让报错信息更友好使用户在开发作业的过程中更容易定位问题。

与此同时我们也在重构当前 Python API 的文档。这块主要是参考一些其他成熟的 Python 项目的经验比如 Pandas。使得用户在 Python API 文档中更容易找到 PyFlink 中各个 API 的使用方式。

作业运行的稳定性也非常重要。我们也会持续改进并加强 PyFlink 作业运行的稳定性比如降低 PyFlink 作业在进程模式下 checkpoint 的耗时等。

最后也欢迎大家扫码加入【PyFlink 交流群】交流和反馈相关的问题和想法。

0aad41153c3fb770a3fd83782bdaef86.png

往期精选

02d03b784fca33f0d635978eb923812f.png

f6005ea1925b4ad9f9fb7e01e1e3416a.jpeg

89f47c5cc6c16f8dd29300a1a1912ee9.jpeg

503a301f91f4baafaa7072f1bfa4623c.jpeg

ec9ad6237959279b9b44bfa180e74b81.jpeg

▼ 关注「Apache Flink」获取更多技术干货 ▼

12cf086c274276f348411dc6ea49f619.png

 bde61cf95d6d3c69afd1f928c2b4605a.gif  点击「阅读原文」查看原文视频&演讲 PPT

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6