本文为您介绍如何搭建实时计算Flink版自定义函数的环境并使用自定义函数。
注意
- 仅独享模式支持自定义函数。
- Blink在开源Flink SQL的基础上对性能进行了增强,Blink是阿里云实时计算版本的Flink。UDX函数仅适用于Blink,对开源Flink暂不适用。
UDX分类
实时计算Flink版支持以下3类自定义函数。
UDX分类 | 描述 |
---|---|
UDF(User Defined Scalar Function) | 用户自定义标量值函数,其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。 |
UDAF(User Defined Aggregation Function) | 自定义聚合函数,其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。可以与SQL中的GROUP BY语句一起使用。具体语法请参见聚合函数。 |
UDTF(User Defined Table-valued Function) | 自定义表值函数,调用一次函数输出多行或多列数据。 |
UDX示例
实时计算Flink版为您提供UDX示例,便于您快速开发业务。实时计算Flink版UDX示例中包含UDF、UDAF和UDTF的实现,示例如下。
说明
- 示例中已为您配置对应版本的开发环境,您无需进行环境搭建。
- 示例为Maven项目,您可以使用IntelliJ IDEA进行开发。开发方法请参见开发。
- 实时计算Flink版3.0版本
- 实时计算Flink版2.0版本
- 实时计算Flink版1.0版本
环境搭建
以上示例主要使用的依赖JAR包如下,如果您需要单独使用,可以自行下载。
- 基于实时计算Flink版3.2.1以下版本
- 基于实时计算Flink版3.2.1及以上版本
注册使用
- 登录实时计算控制台。
- 在顶部菜单中,单击开发。
- 在左侧的导航栏中,单击资源引用。
- 在资源引用页签的右上角,单击新建资源。
- 在上传资源页面,输入资源配置信息。
参数名称 说明 上传方式 实时计算控制台上仅支持本地上传。 说明 本地上传JAR包的大小上限为300 MB。如果JAR包大小超过300 MB,请在集群绑定的OSS控制台上,或通过OpenAPI的方式上传JAR包。资源选择 单击选择资源,选择需要引用的资源。 资源名称 输入资源名称。 资源备注 输入资源备注信息。 资源类型 选择引用资源类型,JAR、DICTIONARY或PYTHON。 - 在资源引用页签中,将鼠标悬停在对应作业的右侧的更多上。
- 在下拉列表中,选择引用。
- 在作业的编辑窗口的顶部,输入自定义函数声明,示例如下。
CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';
参数与返回值类型
实时计算Flink版支持定义Java UDX时,使用Java类型作为参数和返回值。下面为实时计算Flink版类型和Java类型的映射关系。
实时计算Flink版数据类型 | Java类型 |
---|---|
TINYINT | java.lang.Byte |
SMALLINT | java.lang.Short |
INT | java.lang.Integer |
BIGINT | java.lang.Long |
FLOAT | java.lang.Float |
DOUBLE | java.lang.Double |
DECIMAL | java.math.BigDecimal |
BOOLEAN | java.lang.Boolean |
DATE | java.sql.Date |
TIME | java.sql.Time |
TIMESTAMP | java.sql.Timestamp |
CHAR | java.lang.Character |
STRING | java.lang.String |
VARBINARY | java.lang.byte[] |
ARRAY | 暂不支持 |
MAP | 暂不支持 |
获取自定义函数参数
自定义函数中提供了可选的open(FunctionContext context)
方法,FunctionContext
具备参数传递功能,自定义配置项可以通过此对象来传递。
假设,您需要在作业中添加如下两个参数。
testKey1=lincoln
test.key2=todd
以UDTF为例,在Open方法中通过
context.getJobParameter
即可获取,示例如下。 public void open(FunctionContext context) throws Exception {
String key1 = context.getJobParameter("testKey1", "empty");
String key2 = context.getJobParameter("test.key2", "empty");
System.err.println(String.format("end open: key1:%s, key2:%s", key1, key2));
}
说明 具体的作业参数请参见作业参数。