本文將向您介紹編寫和使用使用者自訂函數(UDF)的方法。
自訂函數(UDF)
如果內建的函數不能滿足您的需求,Flink CDC資料攝入作業也支援使用Java語言編寫自訂UDF函數,並像內建函數一樣調用。
此處類路徑對應的JAR包需要在“更多配置”中作為附加依賴檔案上傳。
Flink CDC UDF定義
如需編寫Flink CDC UDF類,您需要在pom.xml中引入基礎依賴包:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${CDC 社區版本號碼}</version>
<scope>provided</scope>
</dependency>請參考下表選定使用的社區版本號碼:
Realtime Compute VVR 引擎版本 | 對應的社區版本號碼 |
11.0 及更高版本 | 3.4.0 |
8.0.11 版本 | 3.3.0 |
8.0.10 及更早版本 | 3.2.1 |
滿足以下要求的Java類可以作為Flink CDC資料攝入作業UDF函數使用:
實現了
org.apache.flink.cdc.common.udf.UserDefinedFunction介面。擁有一個公用無參構造器。
至少含有一個名為
eval的公用方法。
UDF函數類可以通過@Override以下介面來實現更精細的語義控制:
重寫
getReturnType方法來手動指定方法的傳回型別。重寫
open和close方法來插入生命週期函數。
例如,將傳入的整型參數增加1後返回的UDF函數定義如下。
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// 由於eval函數的傳回型別不明確,需要
// 使用getReturnType寫明確指定類型
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}類型映射關係
下表列出了UDF函數類中eval方法參數及傳回值類型與getReturnType方法傳回值之間的映射關係。
CDC列類型 | 對應的Java類 | 備忘 |
BOOLEAN | java.lang.Boolean | |
TINYINT | java.lang.Byte | |
SMALLINT | java.lang.Short | |
INTEGER | java.lang.Integer | |
BIGINT | java.lang.Long | |
FLOAT | java.lang.Float | |
DOUBLE | java.lang.Double | |
DECIMAL | java.math.BigDecimal | |
DATE | java.time.LocalDate | |
TIME | java.time.LocalTime | |
TIMESTAMP | java.time.LocalDateTime | |
TIMESTAMP_TZ | java.time.ZonedDateTime | |
TIMESTAMP_LTZ | java.time.Instant | |
CHAR VARCHAR STRING | java.lang.String | |
BINARY VARBINARY BYTES | byte[] | |
ARRAY | java.util.List | ARRAY內部元素類型映射到List類的泛型參數。 |
MAP | java.util.Map | MAP鍵、實值型別映射到Map類的泛型參數。 |
ROW | java.util.List |
Flink CDC UDF註冊
通過在Flink CDC資料攝入作業的pipeline塊中加入如下所示的定義即可註冊UDF函數:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass此處類路徑對應的JAR包,需要在更多配置中作為附加依賴檔案上傳。
UDF函數名稱可以在此處任意調整,無需與UDF類名一致。
Flink CDC UDF使用
在完成UDF函數註冊後,即可在projection和filter語句塊中,像內建函數一樣直接調用UDF函數。程式碼範例如下。
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100Flink SQL UDF相容性
繼承自ScalarFunction的Flink UDF函數也可以直接作為CDC YAML UDF函數註冊並使用,但存在以下限制:
不支援帶參數的
ScalarFunction。Flink風格的TypeInformation類型標註會被忽略。
open和close生命週期鉤子函數不會被調用。