Flink全托管支持在SQL作业中使用Python自定义函数,本文为您介绍Flink自定义函数的分类、定义和依赖,以及如何进行调试和调优。

自定义函数分类

Flink支持以下3类自定义函数。
自定义函数分类 描述
UDSF(User Defined Scalar Function) 用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDSF)
UDAF(User Defined Aggregation Function) 自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)
UDTF(User Defined Table-valued Function) 自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)

自定义函数注册方法

自定义函数分为全局自定义函数和作业级自定义函数,注册方法详情如下所示:
  • 全局自定义函数
    1. 登录实时计算控制台
    2. Flink全托管页签,单击目标工作空间左侧导航栏上的作业开发
    3. 在页面左上角,单击添加图标。
    4. 上传自定义函数Python文件。注册UDF
      您可以通过以下任何一种方式上传自定义函数Python文件:
      • 上传文件:单击选择文件项右侧的选择文件后,选择您的目标自定义函数Python文件。如果有依赖文件,单击依赖文件项右侧的选择文件,选择您的目标自定义函数Python文件所依赖的Python文件或者zip包。
      • 外部URL:输入外部URL地址。
      说明
      • 若自定义函数文件或者其依赖文件比较大,推荐通过外部URL的方式进行上传。需要注意的是,如果外部URL是OSS Bucket地址,其依赖文件必须位于sql-artifacts/namespaces/{namespace}目录下。
      • 您的自定义函数Python文件会被上传到您选择的OSS Bucket中的sql-artifacts目录下。此外,Flink开发控制台会解析自定义函数Python文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。
      • 对于Python类型的UDF,其依赖可以打包到自定义函数Python文件中,也可以通过依赖文件项进行上传。
    5. 单击确认

      在SQL编辑器页面左侧的UDFs列表,您可以看到所有注册成功的UDF。

依赖

Flink全托管集群已预装了Pandas、NumPy和PyArrow等常用的Python包,您可以在作业开发页面,了解Flink全托管集群中已安装的第三方Python包列表。

说明 预装的Python包使用时需要在Python函数内部导入。示例如下。
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

此外,您也可以在Python自定义函数中使用其它类型的第三方Python包。需要注意的是,如果使用了非预装的第三方Python包,在注册Python UDF时,需要将其作为依赖文件上传,详情请参见管理自定义函数(UDF)使用Python依赖

调试

您可以在Python自定义函数的代码实现中,通过Logging的方式,输出日志信息,方便问题定位,示例如下。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j
日志输出后,您可以在TaskManager的日志文件中查看日志。

调优

  • 预先加载资源
    预先加载资源可以在UDF初始化时提前加载资源,无需在每一次执行计算(即eval)时重新加载资源。例如,您可能只想加载一次大型深度学习模型,然后对模型多次运行批量预测。代码示例如下。
    from pyflink.table import DataTypes
    from pyflink.table.udf import ScalarFunction, udf
    
    class Predict(ScalarFunction):
        def open(self, function_context):
            import pickle
    
            with open("resources.zip/resources/model.pkl", "rb") as f:
                self.model = pickle.load(f)
    
        def eval(self, x):
            return self.model.predict(x)
    
    predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
    说明 关于如何上传Python数据文件,可以参考文档使用数据文件
  • 使用Pandas库

    除了普通Python自定义函数之外,Flink全托管也支持您使用Pandas自定义函数。对于Pandas自定义函数,输入数据的类型是Pandas中定义的数据结构,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自定义函数中使用Pandas和Numpy等高性能的Python库,开发出高性能的Python自定义函数,详情请参见Vectorized User-defined Functions

  • 配置以下参数
    Python自定义函数的性能在很大程度取决于Python自定义函数自身的实现,如果遇到性能问题,您需要尽可能优化Python自定义函数的实现。除此之外,Python自定义函数的性能也受以下参数取值的影响。
    参数 说明
    python.fn-execution.bundle.size Python UDF的计算是异步的,在执行过程中,Java算子将数据异步发送给Python进程进行处理。Java算子在将数据发送给Python进程之前,会先将数据缓存起来,到达一定阈值之后,再发送给Python进程。python.fn-execution.bundle.size参数可用来控制可缓存的数据最大条数。

    默认值为100000,单位是条数。

    python.fn-execution.bundle.time 用来控制数据的最大缓存时间。当缓存的数据条数到达python.fn-execution.bundle.size定义的阈值或缓存时间到达python.fn-execution.bundle.time定义的阈值时,会触发缓存数据的计算。

    默认值为1000,单位是毫秒。

    python.fn-execution.arrow.batch.size 使用Pandas UDF时,一个arrow batch可容纳的数据最大条数,默认值为10000。
    说明 python.fn-execution.arrow.batch.size参数值不能大于python.fn-execution.bundle.size参数值。
    说明 以上3个参数并不是配置的越大越好,当这些参数取值配置过大时,可能会导致Checkpoint时,需要处理过多的数据,从而导致Checkpoint时间过长,甚至会导致Checkpoint失败。以上参数的更多详情,请参见Configuration