本文为您介绍如何搭建实时计算Flink版自定义函数的环境并使用自定义函数。

注意
  • 仅独享模式支持自定义函数。
  • Blink在开源Flink SQL的基础上对性能进行了增强,Blink是阿里云实时计算版本的Flink。UDX函数仅适用于Blink,对开源Flink暂不适用。

UDX分类

实时计算Flink版支持以下3类自定义函数。
UDX分类 描述
UDF(User Defined Scalar Function) 用户自定义标量值函数,其输入与输出是一对一的关系,即读入一行数据,写出一条输出值。
UDAF(User Defined Aggregation Function) 自定义聚合函数,其输入与输出是多对一的关系,即将多条输入记录聚合成一条输出值。可以与SQL中的GROUP BY语句一起使用。具体语法请参见聚合函数
UDTF(User Defined Table-valued Function) 自定义表值函数,调用一次函数输出多行或多列数据。

UDX示例

实时计算Flink版为您提供UDX示例,便于您快速开发业务。实时计算Flink版UDX示例中包含UDF、UDAF和UDTF的实现,示例如下。
说明
  • 示例中已为您配置对应版本的开发环境,您无需进行环境搭建。
  • 示例为Maven项目,您可以使用IntelliJ IDEA进行开发。开发方法请参见开发

环境搭建

以上示例主要使用的依赖JAR包如下,如果您需要单独使用,可以自行下载。

注册使用

  1. 登录实时计算控制台
  2. 在顶部菜单中,单击开发
  3. 在左侧的导航栏中,单击资源引用
  4. 资源引用页签的右上角,单击新建资源
  5. 上传资源页面,输入资源配置信息。
    参数名称 说明
    上传方式 实时计算控制台上仅支持本地上传。
    说明 本地上传JAR包的大小上限为300 MB。如果JAR包大小超过300 MB,请在集群绑定的OSS控制台上,或通过OpenAPI的方式上传JAR包。
    资源选择 单击选择资源,选择需要引用的资源。
    资源名称 输入资源名称。
    资源备注 输入资源备注信息。
    资源类型 选择引用资源类型,JAR、DICTIONARY或PYTHON。
  6. 资源引用页签中,将鼠标悬停在对应作业的右侧的更多上。
  7. 在下拉列表中,选择引用
  8. 在作业的编辑窗口的顶部,输入自定义函数声明,示例如下。
    CREATE FUNCTION stringLengthUdf AS 'com.hjc.test.blink.sql.udx.StringLengthUdf';

参数与返回值类型

实时计算Flink版支持定义Java UDX时,使用Java类型作为参数和返回值。下面为实时计算Flink版类型和Java类型的映射关系。
实时计算Flink版数据类型 Java类型
TINYINT java.lang.Byte
SMALLINT java.lang.Short
INT java.lang.Integer
BIGINT java.lang.Long
FLOAT java.lang.Float
DOUBLE java.lang.Double
DECIMAL java.math.BigDecimal
BOOLEAN java.lang.Boolean
DATE java.sql.Date
TIME java.sql.Time
TIMESTAMP java.sql.Timestamp
CHAR java.lang.Character
STRING java.lang.String
VARBINARY java.lang.byte[]
ARRAY 暂不支持
MAP 暂不支持

获取自定义函数参数

自定义函数中提供了可选的open(FunctionContext context)方法,FunctionContext具备参数传递功能,自定义配置项可以通过此对象来传递。

假设,您需要在作业中添加如下两个参数。
testKey1=lincoln
test.key2=todd
以UDTF为例,在Open方法中通过context.getJobParameter即可获取,示例如下。
public void open(FunctionContext context) throws Exception {
      String key1 = context.getJobParameter("testKey1", "empty");
      String key2 = context.getJobParameter("test.key2", "empty");
      System.err.println(String.format("end open: key1:%s, key2:%s", key1, key2));
}
说明 具体的作业参数请参见作业参数