全部產品
Search
文件中心

Realtime Compute for Apache Flink:Flink CDC自訂函數

更新時間:Dec 04, 2025

本文將向您介紹編寫和使用使用者自訂函數(UDF)的方法。

自訂函數(UDF)

如果內建的函數不能滿足您的需求,Flink CDC資料攝入作業也支援使用Java語言編寫自訂UDF函數,並像內建函數一樣調用。

說明

此處類路徑對應的JAR包需要在“更多配置”中作為附加依賴檔案上傳。

Flink CDC UDF定義

如需編寫Flink CDC UDF類,您需要在pom.xml中引入基礎依賴包:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cdc-common</artifactId>
    <version>${CDC 社區版本號碼}</version>
    <scope>provided</scope>
</dependency>

請參考下表選定使用的社區版本號碼:

Realtime Compute VVR 引擎版本

對應的社區版本號碼

11.0 及更高版本

3.4.0

8.0.11 版本

3.3.0

8.0.10 及更早版本

3.2.1

滿足以下要求的Java類可以作為Flink CDC資料攝入作業UDF函數使用:

  • 實現了org.apache.flink.cdc.common.udf.UserDefinedFunction介面。

  • 擁有一個公用無參構造器。

  • 至少含有一個名為eval的公用方法。

UDF函數類可以通過@Override以下介面來實現更精細的語義控制:

  • 重寫getReturnType方法來手動指定方法的傳回型別。

  • 重寫openclose方法來插入生命週期函數。

例如,將傳入的整型參數增加1後返回的UDF函數定義如下。

public class AddOneFunctionClass implements UserDefinedFunction {
    
    public Object eval(Integer num) {
        return num + 1;
    }
    
    @Override
    public DataType getReturnType() {
        // 由於eval函數的傳回型別不明確,需要
        // 使用getReturnType寫明確指定類型
        return DataTypes.INT();
    }
    
    @Override
    public void open() throws Exception {
        // ...
    }

    @Override
    public void close() throws Exception {
        // ...
    }
}

類型映射關係

下表列出了UDF函數類中eval方法參數及傳回值類型與getReturnType方法傳回值之間的映射關係。

CDC列類型

對應的Java類

備忘

BOOLEAN

java.lang.Boolean

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INTEGER

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

DECIMAL

java.math.BigDecimal

DATE

java.time.LocalDate

TIME

java.time.LocalTime

TIMESTAMP

java.time.LocalDateTime

TIMESTAMP_TZ

java.time.ZonedDateTime

TIMESTAMP_LTZ

java.time.Instant

CHAR

VARCHAR

STRING

java.lang.String

BINARY

VARBINARY

BYTES

byte[]

ARRAY

java.util.List

ARRAY內部元素類型映射到List類的泛型參數。

MAP

java.util.Map

MAP鍵、實值型別映射到Map類的泛型參數。

ROW

java.util.List

Flink CDC UDF註冊

通過在Flink CDC資料攝入作業的pipeline塊中加入如下所示的定義即可註冊UDF函數:

pipeline:
  user-defined-function:
    - name: inc
      classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
    - name: format
      classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
說明
  • 此處類路徑對應的JAR包,需要在更多配置中作為附加依賴檔案上傳。

  • UDF函數名稱可以在此處任意調整,無需與UDF類名一致。

Flink CDC UDF使用

在完成UDF函數註冊後,即可在projection和filter語句塊中,像內建函數一樣直接調用UDF函數。程式碼範例如下。

transform:
  - source-table: db.\.*
    projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
    filter: inc(id) < 100

Flink SQL UDF相容性

繼承自ScalarFunction的Flink UDF函數也可以直接作為CDC YAML UDF函數註冊並使用,但存在以下限制:

  • 不支援帶參數的ScalarFunction

  • Flink風格的TypeInformation類型標註會被忽略。

  • openclose生命週期鉤子函數不會被調用。