  • pandas udf和python udf区别前者向量化是在不同partition上处理



1.1 udf的简单介绍

  • 在java分布式系统中执行python程序是挺耗性能的如下图数据在JVM和Python中进行传输有额外的序列化和调用开销apache arrow项目由此发起以加速大数据分析项目运行速度。
  • apache arrow是一种内存中的列式数据格式用于spark中JVM和python进程之间的数据高效传输。在调用Arrow之前需要将spark配置选项设置为truespark.conf.set("spark.sql.execution.arrow.enabled", "true")但在spark3.0后的版本中需要改为spark.sql.execution.arrow.pyspark.enabled


  • udf自定义函数可让我们在使用pyspark进行业务分析时高效自定义功能一般分为两种
    • event level是对一条事件or数据进行计算
    • aggregation function 对某个aggregation key的自定义聚合计算。如对pyspark中df使用collection_list或者collect_set把需要聚合的信息变成一个list后通过event level的udf实现。
      • ex计算用户多次登陆时间的最大值如下代码。
      • 上面栗子的缺点如果主键是热点即聚合出的元素很多容易OOM可只对聚合出的list先进行裁剪如按照时间排序保留最后topk的事件。
def find_max(lis):
  return max(lis)

SparkDataFrame.groupBy("userId"). \
  • 为什么 RDD filter() 方法那么慢呢原因是 lambda 函数不能直接应用于驻留在 JVM 内存中的 DataFrame。
    • 内部实际发生的是 Spark 在集群节点上的 Spark 执行程序旁边启动 Python 工作线程。在执行时Spark 工作器将 lambda 函数发送给这些 Python 工作器。接下来Spark worker 开始序列化他们的 RDD 分区并通过套接字将它们通过管道传输到 Python workerlambda 函数在每行上进行评估。对于结果行整个序列化/反序列化过程在再次发生以便实际的 filter() 可以应用于结果集。
    • 因为数据来回复制过多在分布式 Java 系统中执行 Python 函数在执行时间方面非常昂贵。这个底层的探索只要避免Python UDFPySpark 程序将大约与基于 Scala 的 Spark 程序一样快。如果无法避免 UDF至少应该尝试使它们尽可能高效。

1.2 udf的写法

  • 写代码创建函数
  • 写一个单元测试
  • 保证测试通过并且计算结果与业务实际需求相符
  • 将函数register到pyspark注意必须声明函数返回值的数据类型参考下表进行pyspark和python数据类型的对照


1.3 udf的使用场景


上图源自《Data Analysis with Python and PySpark》。


from fractions import Fraction
from typing import Tuple, Optional

Frac = Tuple[int, int]

def py_reduce_fraction(frac: Frac) -> Optional[Frac]:
    """Reduce a fraction represented as a 2-tuple of integers."""
    num, denom = frac
    if denom:
        answer = Fraction(num, denom)
        return answer.numerator, answer.denominator
    return None

assert py_reduce_fraction((3, 6)) == (1, 2)
assert py_reduce_fraction((1, 0)) is None

def py_fraction_to_float(frac: Frac) -> Optional[float]:
    """Transforms a fraction represented as a 2-tuple of integers into a float."""
    num, denom = frac
    if denom:
        return num / denom
    return None

assert py_fraction_to_float((2, 8)) == 0.25
assert py_fraction_to_float((10, 0)) is None

SparkFrac = T.ArrayType(T.LongType())
reduce_fraction = F.udf(py_reduce_fraction, SparkFrac)

frac_df = frac_df.withColumn(
    "reduced_fraction", reduce_fraction(F.col("fraction"))
print("=====================udf test2:\n")
frac_df.show(5, False)
# +--------+----------------+
# |fraction|reduced_fraction|
# +--------+----------------+
# |[0, 1]  |[0, 1]          |
# |[0, 2]  |[0, 1]          |
# |[0, 3]  |[0, 1]          |
# |[0, 4]  |[0, 1]          |
# |[0, 5]  |[0, 1]          |
# +--------+----------------+
# only showing top 5 rows


def fraction_to_float(frac: Frac) -> Optional[float]:
    """Transforms a fraction represented as a 2-tuple of integers into a float."""
    num, denom = frac
    if denom:
        return num / denom
    return None

frac_df = frac_df.withColumn(
    "fraction_float", fraction_to_float(F.col("reduced_fraction"))
print("================udf test 3:\n")
frac_df.select("reduced_fraction", "fraction_float").distinct().show(5, False)
# +----------------+-------------------+
# |reduced_fraction|fraction_float     |
# +----------------+-------------------+
# |[3, 50]         |0.06               |
# |[3, 67]         |0.04477611940298507|
# |[7, 76]         |0.09210526315789473|
# |[9, 23]         |0.391304347826087  |
# |[9, 25]         |0.36               |
# +----------------+-------------------+
# only showing top 5 rows
assert fraction_to_float.func((1, 2)) == 0.5




  • 可以与select和withColumn等函数一起使用。python 函数应该以pandas.series作为输入并返回一个长度相同的pandas.series
  • 在内部spark 将通过将列拆分为batch并将每个batch的函数作为数据的子集调用然后将结果连接在一起来执行 padas UDF。
  • Pandas_UDF是在PySpark 2.3版本中新增的APISpark经过Arrow传输数据使用Pandas处理数据。Pandas_UDF使用关键字pandas_udf做为装饰器或声明一个函数进行定义 Pandas_UDF包括Scalar标量映射和Grouped Map分组映射等类型。栗子
def predict_batch(paths):

predictions = predict_batch(pd.Series(files[:200]))

# 将函数封装成成pandas_udf经过该udf进行模型预测
predict_udf = pandas_udf(ArrayType(FloatType()), PandasUDFType.SCALAR)(predict_batch)

predictions_df = files_df.select(col('path'), predict_udf(col('path')).alias("prediction"))

2Grouped Map

Grouped Map和后面的Grouped Aggregate都适合pyspark的split-apply-combine计算模式

from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")

from pyspark.sql.types import *

schema = StructType([
    StructField("key", StringType()),
    StructField("avg_value1", DoubleType()),
    StructField("avg_value2", DoubleType()),
    StructField("sum_avg", DoubleType()),
    StructField("sub_avg", DoubleType())

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
    gr = df['key'].iloc[0]
    x = df.value1.mean()
    y = df.value2.mean()
    w = df.value1.mean() + df.value2.mean()
    z = df.value1.mean() - df.value2.mean()
    return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

|  a|       0.0|      21.0|   21.0|  -21.0|
|  b|       6.5|      -1.5|    5.0|    8.0|


from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pandas as pd
import torch
from torch import nn

class Linear(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(Linear, self).__init__()
        self.linear = nn.Linear(input_dim, output_dim)

    def forward(self, x):  # 前向传播
        out = self.linear(x)  # 输入x输出out
        return out

conf = SparkConf() \
  .setAppName("dataframe") \
  .set("spark.sql.execution.arrow.pyspark.enabled", "true")

spark = SparkSession.builder.config(conf=conf).getOrCreate()

@pandas_udf("uid long, aid long, score float", PandasUDFType.GROUPED_MAP)
def age_predict(df):
    linear = Linear(2, 1)
    df['score'] = linear(torch.from_numpy(df.values).type(torch.float32)).detach().numpy()
    return df.loc[:, ['uid', 'aid', 'score']]

df = spark.read.format("json").load("hdfs:///tmp/predict.json").repartition(2)

res = df.groupby(F.spark_partition_id()).apply(age_predict)

3Grouped Aggregate



import json
from functools import wraps
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd

class pandas_udf_ct(object):
    """Decorator for UDAFs with Spark >= 2.3 and complex types
        returnType: the return type of the user-defined function. The value can be either a
                    pyspark.sql.types.DataType object or a DDL-formatted type string.
        functionType: an enum value in pyspark.sql.functions.PandasUDFType. Default: SCALAR.

        Function with arguments `cols_in` and `cols_out` defining column names having complex
        types that need to be transformed during input and output for GROUPED_MAP. In case of
        SCALAR, we are dealing with a series and thus transformation is done if `cols_in` or
        `cols_out` evaluates to `True`.
        Calling this functions with these arguments returns the actual UDF.

    def __init__(self, returnType=None, functionType=None):
        self.return_type = returnType
        self.function_type = functionType

    def __call__(self, func):
        def converter(*, cols_in=None, cols_out=None):
            if cols_in is None:
                cols_in = list()
            if cols_out is None:
                cols_out = list()

            @pandas_udf(self.return_type, self.function_type)
            def udf_wrapper(values):
                if isinstance(values, pd.DataFrame):
                    values = cols_from_json(values, cols_in)
                elif isinstance(values, pd.Series) and cols_in:
                    values = values.apply(json.loads)
                res = func(values)
                if self.function_type == PandasUDFType.GROUPED_MAP:
                    if isinstance(res, pd.Series):
                        res = res.to_frame().T
                    res = cols_to_json(res, cols_out)
                elif cols_out and self.function_type == PandasUDFType.SCALAR:
                    res = res.apply(ct_val_to_json)
                elif (isinstance(res, (dict, list)) and
                      self.function_type == PandasUDFType.GROUPED_AGG):
                    res = ct_val_to_json(res)
                return res

            return udf_wrapper

        return converter

from pyspark.sql.types import MapType, StructType, ArrayType, StructField
from pyspark.sql.functions import to_json, from_json

def is_complex_dtype(dtype):
    """Check if dtype is a complex type
        dtype: Spark Datatype
        Bool: if dtype is complex
    return isinstance(dtype, (MapType, StructType, ArrayType))

def complex_dtypes_to_json(df):
    """Converts all columns with complex dtypes to JSON
        df: Spark dataframe
        tuple: Spark dataframe and dictionary of converted columns and their data types
    conv_cols = dict()
    selects = list()
    for field in df.schema:
        if is_complex_dtype(field.dataType):
            conv_cols[field.name] = field.dataType
    df = df.select(*selects)
    return df, conv_cols

def complex_dtypes_from_json(df, col_dtypes):
    """Converts JSON columns to complex types
        df: Spark dataframe
        col_dtypes (dict): dictionary of columns names and their datatype
        Spark dataframe
    selects = list()
    for column in df.columns:
        if column in col_dtypes.keys():
            schema = StructType([StructField('root', col_dtypes[column])])
            selects.append(from_json(column, schema).getItem('root').alias(column))
    return df.select(*selects)

import json

def cols_from_json(df, columns):
    """Converts Pandas dataframe colums from json
        df (dataframe): Pandas DataFrame
        columns (iter): list of or iterator over column names
        dataframe: new dataframe with converted columns
    for column in columns:
        df[column] = df[column].apply(json.loads)
    return df

def ct_val_to_json(value):
    """Convert a scalar complex type value to JSON
        value: map or list complex value
        str: JSON string
    return json.dumps({'root': value})

def cols_to_json(df, columns):
    """Converts Pandas dataframe columns to json and adds root handle
        df (dataframe): Pandas DataFrame
        columns ([str]): list of column names
        dataframe: new dataframe with converted columns
    for column in columns:
        df[column] = df[column].apply(ct_val_to_json)
    return df

# 1. 构造数据集
from pyspark.sql.types import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

df = spark.createDataFrame([(1., {'a': 1}, ["a", "a"], Row(a=1)),
                            (2., {'b': 1}, ["a", "b"], Row(a=42)),
                            (3., {'a': 1, 'b': 3}, ["d","e"], Row(a=1))],
                           schema=['vals', 'maps', 'lists', 'structs'])
df.show(), df.printSchema()

# 2. 定义处理过程 并且用装饰器
df_json, ct_cols = complex_dtypes_to_json(df)

def change_vals(dct):
    dct['x'] = 42
    return dct

@pandas_udf_ct(df_json.schema, PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    return pdf

# 3. 使用定义的装饰器
df_json = df_json.groupby("vals").apply(normalize(cols_in=ct_cols, cols_out=ct_cols))
df_final = complex_dtypes_from_json(df_json, ct_cols)
print("================ test 8:\n")
df_final.show(truncate=False), df_final.printSchema()
  • 上面栗子中的需求将值为42的键x添加到maps列中的字典中。步骤
    • 使用 complex_dtypes_to_json 来获取转换后的 Spark 数据帧 df_json 和转换后的列 ct_cols
    • 然后定义 UDF 规范化并使用的 pandas_udf_ct 装饰它使用 dfj_json.schema因为只需要简单的数据类型和函数类型 GROUPED_MAP 指定返回类型。
    • 其中初始的数据df和处理后的结果df如下所示。

只是为了演示现在按 df_json 的 vals 列分组并在每个组上应用的规范化 UDF。如前所述必须首先使用参数 cols_in 和 cols_out 调用它而不是仅仅传递 normalize。作为输入列传递了来自 complex_dtypes_to_json 函数的输出 ct_cols并且由于没有更改 UDF 中数据帧的形状因此将其用于输出 cols_out。如果的 UDF 删除列或添加具有复杂数据类型的其他列则必须相应地更改 cols_out。作为最后一步使用 complex_dtypes_from_json 将转换后的 Spark 数据帧的 JSON 字符串转换回复杂数据类型。

# 初始df
|vals|            maps| lists|structs|
| 1.0|        {a -> 1}|[a, a]|    {1}|
| 2.0|        {b -> 1}|[a, b]|   {42}|
| 3.0|{a -> 1, b -> 3}|[d, e]|    {1}|

 |-- vals: double (nullable = true)
 |-- maps: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- lists: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- structs: struct (nullable = true)
 |    |-- a: long (nullable = true)

# 结果df
|vals|maps                     |lists |structs|
|1.0 |{a -> 1, x -> 42}        |[a, a]|{1}    |
|2.0 |{b -> 1, x -> 42}        |[a, b]|{42}   |
|3.0 |{a -> 1, b -> 3, x -> 42}|[d, e]|{1}    |

 |-- vals: double (nullable = true)
 |-- maps: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = true)
 |-- lists: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- structs: struct (nullable = true)
 |    |-- a: long (nullable = true)


