All Products
Search
Document Center

Realtime Compute for Apache Flink:Flink CDC UDFs

Last Updated:Mar 14, 2026

Topik ini menjelaskan cara membuat dan menggunakan user-defined functions (UDF).

User-Defined Function (UDF)

Jika fungsi bawaan di Flink CDC tidak memenuhi kebutuhan Anda, Anda dapat membuat UDF Java kustom untuk Pekerjaan Ingesti Data Anda. UDF tersebut dapat dipanggil dengan cara yang sama seperti fungsi bawaan.

Catatan

Anda harus mengunggah file JAR ke classpath sebagai dependensi tambahan pada tab Configurations di draf pekerjaan Anda.

Define a UDF

Untuk menulis kelas UDF Flink CDC, tambahkan dependensi dasar berikut ke file pom.xml Anda:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cdc-common</artifactId>
    <version>${Apache Flink CDC version}</version>
    <scope>provided</scope>
</dependency>

Pilih nomor versi komunitas dari tabel berikut:

VVR engine version

Corresponding community version number

11.3 dan versi yang lebih baru

3.5.0

Versi 11.0 hingga 11.2

3.4.0

Versi 8.0.11

3.3.0

Versi 8.0.10 dan sebelumnya

3.2.1

Kelas Java harus memenuhi persyaratan berikut agar dapat digunakan sebagai UDF dalam Pekerjaan Ingesti Data Flink CDC:

  • Kelas tersebut harus mengimplementasikan antarmuka org.apache.flink.cdc.common.udf.UserDefinedFunction.

  • Kelas tersebut harus memiliki konstruktor publik tanpa parameter.

  • Kelas tersebut harus berisi setidaknya satu metode publik bernama eval.

Anda dapat meng-override metode berikut dalam kelas UDF untuk kontrol yang lebih rinci:

  • Override metode getReturnType untuk secara eksplisit menentukan tipe nilai kembali.

  • Override metode open dan close untuk mengimplementasikan fungsi siklus hidup.

Sebagai contoh, kode berikut mendefinisikan UDF yang menambahkan parameter integer sebesar 1 dan mengembalikan hasilnya.

public class AddOneFunctionClass implements UserDefinedFunction {
    
    public Object eval(Integer num) {
        return num + 1;
    }
    
    @Override
    public DataType getReturnType() {
        // Tipe nilai kembali dari fungsi eval bersifat ambigu.
        // Anda harus secara eksplisit menentukan tipe tersebut melalui getReturnType.
        return DataTypes.INT();
    }
    
    @Override
    public void open() throws Exception {
        // ...
    }

    @Override
    public void close() throws Exception {
        // ...
    }
}

Type mappings

Tabel berikut mencantumkan pemetaan antara tipe parameter dan nilai kembali metode eval dalam kelas UDF dan nilai kembali dari metode getReturnType.

CDC Column Type

Java class

Notes

BOOLEAN

java.lang.Boolean

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INTEGER

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

DECIMAL

java.math.BigDecimal

DATE

java.time.LocalDate

TIME

java.time.LocalTime

TIMESTAMP

java.time.LocalDateTime

TIMESTAMP_TZ

java.time.ZonedDateTime

TIMESTAMP_LTZ

java.time.Instant

CHAR

VARCHAR

STRING

java.lang.String

BINARY

VARBINARY

BYTES

byte[]

ARRAY

java.util.List

Tipe elemen dalam ARRAY dipetakan ke parameter generik kelas List.

MAP

java.util.Map

Tipe kunci dan nilai MAP dipetakan ke parameter generik kelas Map.

ROW

java.util.List

VARIANT

org.apache.flink.cdc.common.types.variant.Variant

Catatan

Perhatikan bahwa ini berbeda dari path kelas Variant di Flink SQL.

Register a Flink CDC UDF

Untuk mendaftarkan UDF, tambahkan konfigurasi berikut ke bagian pipeline Pekerjaan Ingesti Data Flink CDC Anda:

pipeline:
  user-defined-function:
    - name: inc
      classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
    - name: format
      classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
Catatan
  • Pada bagian More Configurations, Anda harus mengunggah paket JAR untuk classpath ini sebagai dependensi tambahan.

  • Anda dapat menentukan nama kustom untuk UDF Anda. Nama ini tidak perlu sesuai dengan nama kelas UDF.

Use a Flink CDC UDF

Setelah UDF terdaftar, Anda dapat memanggilnya dalam ekspresi projection dan filter dengan cara yang sama seperti fungsi bawaan:

transform:
  - source-table: db.\.*
    projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
    filter: inc(id) < 100

Compatibility with Flink SQL UDFs

UDF Flink SQL yang memperluas ScalarFunction juga dapat didaftarkan dan digunakan langsung sebagai UDF Flink CDC, dengan batasan berikut:

  • Kelas ScalarFunction yang diparameterisasi tidak didukung.

  • Anotasi TypeInformation gaya Flink diabaikan.

  • Kait siklus hidup open dan close tidak dipanggil.