Flink全托管支持在SQL作业中使用Python自定义函数,本文为您介绍Flink自定义函数的分类、定义和依赖,以及如何进行调试和调优。
自定义函数分类
Flink支持以下3类自定义函数。
自定义函数分类 | 描述 |
---|---|
UDSF(User Defined Scalar Function) | 用户自定义标量值函数,将0个、1个或多个标量值映射到一个新的标量值。其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。详情请参见自定义标量函数(UDSF)。 |
UDAF(User Defined Aggregation Function) | 自定义聚合函数,将多条记录聚合成1条记录。其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。详情请参见自定义聚合函数(UDAF)。 |
UDTF(User Defined Table-valued Function) | 自定义表值函数,将0个、1个或多个标量值作为输入参数(可以是变长参数)。与自定义的标量函数类似,但与标量函数不同。表值函数可以返回任意数量的行作为输出,而不仅是1个值。返回的行可以由1个或多个列组成。调用一次函数输出多行或多列数据。详情请参见自定义表值函数(UDTF)。 |
自定义函数注册方法
自定义函数分为全局自定义函数和作业级自定义函数,注册方法详情如下所示:- 全局自定义函数
- 登录实时计算控制台。
- 在Flink全托管页签,单击目标工作空间左侧导航栏上的作业开发
- 在页面左上角,单击
图标。
- 上传自定义函数Python文件。您可以通过以下任何一种方式上传自定义函数Python文件:
- 上传文件:单击选择文件项右侧的选择文件后,选择您的目标自定义函数Python文件。如果有依赖文件,单击依赖文件项右侧的选择文件,选择您的目标自定义函数Python文件所依赖的Python文件或者zip包。
- 外部URL:输入外部URL地址。
说明- 若自定义函数文件或者其依赖文件比较大,推荐通过外部URL的方式进行上传。需要注意的是,如果外部URL是OSS Bucket地址,其依赖文件必须位于sql-artifacts/namespaces/{namespace}目录下。
- 您的自定义函数Python文件会被上传到您选择的OSS Bucket中的sql-artifacts目录下。此外,Flink开发控制台会解析自定义函数Python文件中是否使用了Flink UDF、UDAF和UDTF接口的类,并自动提取类名,填充到Function Name字段中。
- 对于Python类型的UDF,其依赖可以打包到自定义函数Python文件中,也可以通过依赖文件项进行上传。
- 单击确认。
在SQL编辑器页面左侧的UDFs列表,您可以看到所有注册成功的UDF。
依赖
Flink全托管集群已预装了Pandas、NumPy和PyArrow等常用的Python包,您可以在作业开发页面,了解Flink全托管集群中已安装的第三方Python包列表。
说明 预装的Python包使用时需要在Python函数内部导入。示例如下。
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)
此外,您也可以在Python自定义函数中使用其它类型的第三方Python包。需要注意的是,如果使用了非预装的第三方Python包,在注册Python UDF时,需要将其作为依赖文件上传,详情请参见管理自定义函数(UDF)和使用Python依赖。
调试
您可以在Python自定义函数的代码实现中,通过Logging的方式,输出日志信息,方便问题定位,示例如下。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + j
日志输出后,您可以在TaskManager的日志文件中查看日志。调优
- 预先加载资源
预先加载资源可以在UDF初始化时提前加载资源,无需在每一次执行计算(即eval)时重新加载资源。例如,您可能只想加载一次大型深度学习模型,然后对模型多次运行批量预测。代码示例如下。
from pyflink.table import DataTypes from pyflink.table.udf import ScalarFunction, udf class Predict(ScalarFunction): def open(self, function_context): import pickle with open("resources.zip/resources/model.pkl", "rb") as f: self.model = pickle.load(f) def eval(self, x): return self.model.predict(x) predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
说明 关于如何上传Python数据文件,可以参考文档使用数据文件。 - 使用Pandas库
除了普通Python自定义函数之外,Flink全托管也支持您使用Pandas自定义函数。对于Pandas自定义函数,输入数据的类型是Pandas中定义的数据结构,例如pandas.Series和pandas.DataFrame等,您可以在Pandas自定义函数中使用Pandas和Numpy等高性能的Python库,开发出高性能的Python自定义函数,详情请参见Vectorized User-defined Functions。
- 配置以下参数
Python自定义函数的性能在很大程度取决于Python自定义函数自身的实现,如果遇到性能问题,您需要尽可能优化Python自定义函数的实现。除此之外,Python自定义函数的性能也受以下参数取值的影响。
参数 说明 python.fn-execution.bundle.size Python UDF的计算是异步的,在执行过程中,Java算子将数据异步发送给Python进程进行处理。Java算子在将数据发送给Python进程之前,会先将数据缓存起来,到达一定阈值之后,再发送给Python进程。python.fn-execution.bundle.size参数可用来控制可缓存的数据最大条数。 默认值为100000,单位是条数。
python.fn-execution.bundle.time 用来控制数据的最大缓存时间。当缓存的数据条数到达python.fn-execution.bundle.size定义的阈值或缓存时间到达python.fn-execution.bundle.time定义的阈值时,会触发缓存数据的计算。 默认值为1000,单位是毫秒。
python.fn-execution.arrow.batch.size 使用Pandas UDF时,一个arrow batch可容纳的数据最大条数,默认值为10000。 说明 python.fn-execution.arrow.batch.size参数值不能大于python.fn-execution.bundle.size参数值。说明 以上3个参数并不是配置的越大越好,当这些参数取值配置过大时,可能会导致Checkpoint时,需要处理过多的数据,从而导致Checkpoint时间过长,甚至会导致Checkpoint失败。以上参数的更多详情,请参见Configuration。