全部产品
Search
文档中心

Realtime Compute for Apache Flink:Fungsi user-defined Flink CDC

更新时间:Dec 04, 2025

Topik ini menjelaskan cara membuat dan menggunakan fungsi yang ditentukan pengguna (UDF).

Fungsi user-defined (UDF)

Jika fungsi bawaan tidak memenuhi kebutuhan Anda, pekerjaan data ingestion Flink CDC mendukung UDF kustom yang ditulis dalam Java. Anda dapat memanggil UDF tersebut dengan cara yang sama seperti fungsi bawaan.

Catatan

Anda harus mengunggah file JAR yang berisi UDF Anda sebagai dependensi tambahan dalam konfigurasi lanjutan.

Definisikan UDF Flink CDC

Untuk menulis kelas UDF Flink CDC, tambahkan dependensi dasar berikut ke file pom.xml Anda:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cdc-common</artifactId>
    <version>${CDC community version}</version>
    <scope>provided</scope>
</dependency>

Gunakan tabel berikut untuk memilih versi komunitas yang sesuai:

Versi mesin Realtime Compute VVR

Versi komunitas yang sesuai

11.0 dan yang lebih baru

3.4.0

8.0.11

3.3.0

8.0.10 dan yang lebih lama

3.2.1

Kelas Java dapat digunakan sebagai UDF dalam pekerjaan data ingestion Flink CDC jika memenuhi persyaratan berikut:

  • Mengimplementasikan antarmuka org.apache.flink.cdc.common.udf.UserDefinedFunction.

  • Memiliki konstruktor publik tanpa parameter.

  • Memuat setidaknya satu metode publik bernama eval.

Anda dapat meng-override metode berikut dalam kelas UDF untuk kontrol semantik yang lebih rinci:

  • Override metode getReturnType untuk secara manual menentukan tipe kembali dari metode tersebut.

  • Override metode open dan close untuk mengimplementasikan fungsi siklus hidup.

Sebagai contoh, definisi UDF berikut menambahkan parameter integer sebesar 1 dan mengembalikan hasilnya.

public class AddOneFunctionClass implements UserDefinedFunction {
    
    public Object eval(Integer num) {
        return num + 1;
    }
    
    @Override
    public DataType getReturnType() {
        // Karena tipe kembali dari fungsi eval tidak jelas,
        // Anda harus menggunakan getReturnType untuk secara eksplisit menentukan tipe tersebut.
        return DataTypes.INT();
    }
    
    @Override
    public void open() throws Exception {
        // ...
    }

    @Override
    public void close() throws Exception {
        // ...
    }
}

Pemetaan tipe

Tabel berikut mencantumkan pemetaan antara tipe Java untuk parameter dan nilai kembali metode eval dengan tipe data yang dikembalikan oleh metode getReturnType.

Tipe data CDC

Kelas Java yang sesuai

Catatan

BOOLEAN

java.lang.Boolean

TINYINT

java.lang.Byte

SMALLINT

java.lang.Short

INTEGER

java.lang.Integer

BIGINT

java.lang.Long

FLOAT

java.lang.Float

DOUBLE

java.lang.Double

DECIMAL

java.math.BigDecimal

DATE

java.time.LocalDate

TIME

java.time.LocalTime

TIMESTAMP

java.time.LocalDateTime

TIMESTAMP_TZ

java.time.ZonedDateTime

TIMESTAMP_LTZ

java.time.Instant

CHAR

VARCHAR

STRING

java.lang.String

BINARY

VARBINARY

BYTES

byte[]

ARRAY

java.util.List

Tipe elemen ARRAY dipetakan ke parameter generik kelas List.

MAP

java.util.Map

Tipe kunci dan nilai MAP dipetakan ke parameter generik kelas Map.

ROW

java.util.List

Daftarkan UDF Flink CDC

Untuk mendaftarkan UDF, tambahkan definisinya ke blok pipeline pada pekerjaan data ingestion Flink CDC Anda:

pipeline:
  user-defined-function:
    - name: inc
      classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
    - name: format
      classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass
Catatan
  • Anda harus mengunggah file JAR yang berisi UDF Anda sebagai dependensi tambahan dalam konfigurasi lanjutan.

  • Anda dapat menentukan nama kustom untuk UDF tersebut. Nama tersebut tidak perlu sama dengan nama kelas UDF.

Gunakan Flink CDC UDF

Setelah mendaftarkan UDF, Anda dapat memanggilnya di blok projection dan filter dengan cara yang sama seperti fungsi bawaan. Kode berikut memberikan contohnya.

transform:
  - source-table: db.\.*
    projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
    filter: inc(id) < 100

Kompatibilitas dengan UDF Flink SQL

UDF Flink yang mewarisi dari ScalarFunction juga dapat didaftarkan dan digunakan langsung sebagai UDF CDC YAML, dengan batasan berikut:

  • Kelas ScalarFunction yang diparameterisasi tidak didukung.

  • Anotasi TypeInformation gaya Flink diabaikan.

  • Kait siklus hidup open dan close tidak dipanggil.