Topik ini menjelaskan cara membuat dan menggunakan fungsi yang ditentukan pengguna (UDF).
Fungsi user-defined (UDF)
Jika fungsi bawaan tidak memenuhi kebutuhan Anda, pekerjaan data ingestion Flink CDC mendukung UDF kustom yang ditulis dalam Java. Anda dapat memanggil UDF tersebut dengan cara yang sama seperti fungsi bawaan.
Anda harus mengunggah file JAR yang berisi UDF Anda sebagai dependensi tambahan dalam konfigurasi lanjutan.
Definisikan UDF Flink CDC
Untuk menulis kelas UDF Flink CDC, tambahkan dependensi dasar berikut ke file pom.xml Anda:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${CDC community version}</version>
<scope>provided</scope>
</dependency>Gunakan tabel berikut untuk memilih versi komunitas yang sesuai:
Versi mesin Realtime Compute VVR | Versi komunitas yang sesuai |
11.0 dan yang lebih baru | 3.4.0 |
8.0.11 | 3.3.0 |
8.0.10 dan yang lebih lama | 3.2.1 |
Kelas Java dapat digunakan sebagai UDF dalam pekerjaan data ingestion Flink CDC jika memenuhi persyaratan berikut:
Mengimplementasikan antarmuka
org.apache.flink.cdc.common.udf.UserDefinedFunction.Memiliki konstruktor publik tanpa parameter.
Memuat setidaknya satu metode publik bernama
eval.
Anda dapat meng-override metode berikut dalam kelas UDF untuk kontrol semantik yang lebih rinci:
Override metode
getReturnTypeuntuk secara manual menentukan tipe kembali dari metode tersebut.Override metode
opendancloseuntuk mengimplementasikan fungsi siklus hidup.
Sebagai contoh, definisi UDF berikut menambahkan parameter integer sebesar 1 dan mengembalikan hasilnya.
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// Karena tipe kembali dari fungsi eval tidak jelas,
// Anda harus menggunakan getReturnType untuk secara eksplisit menentukan tipe tersebut.
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}Pemetaan tipe
Tabel berikut mencantumkan pemetaan antara tipe Java untuk parameter dan nilai kembali metode eval dengan tipe data yang dikembalikan oleh metode getReturnType.
Tipe data CDC | Kelas Java yang sesuai | Catatan |
BOOLEAN | java.lang.Boolean | |
TINYINT | java.lang.Byte | |
SMALLINT | java.lang.Short | |
INTEGER | java.lang.Integer | |
BIGINT | java.lang.Long | |
FLOAT | java.lang.Float | |
DOUBLE | java.lang.Double | |
DECIMAL | java.math.BigDecimal | |
DATE | java.time.LocalDate | |
TIME | java.time.LocalTime | |
TIMESTAMP | java.time.LocalDateTime | |
TIMESTAMP_TZ | java.time.ZonedDateTime | |
TIMESTAMP_LTZ | java.time.Instant | |
CHAR VARCHAR STRING | java.lang.String | |
BINARY VARBINARY BYTES | byte[] | |
ARRAY | java.util.List | Tipe elemen ARRAY dipetakan ke parameter generik kelas List. |
MAP | java.util.Map | Tipe kunci dan nilai MAP dipetakan ke parameter generik kelas Map. |
ROW | java.util.List |
Daftarkan UDF Flink CDC
Untuk mendaftarkan UDF, tambahkan definisinya ke blok pipeline pada pekerjaan data ingestion Flink CDC Anda:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClassAnda harus mengunggah file JAR yang berisi UDF Anda sebagai dependensi tambahan dalam konfigurasi lanjutan.
Anda dapat menentukan nama kustom untuk UDF tersebut. Nama tersebut tidak perlu sama dengan nama kelas UDF.
Gunakan Flink CDC UDF
Setelah mendaftarkan UDF, Anda dapat memanggilnya di blok projection dan filter dengan cara yang sama seperti fungsi bawaan. Kode berikut memberikan contohnya.
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100Kompatibilitas dengan UDF Flink SQL
UDF Flink yang mewarisi dari ScalarFunction juga dapat didaftarkan dan digunakan langsung sebagai UDF CDC YAML, dengan batasan berikut:
Kelas
ScalarFunctionyang diparameterisasi tidak didukung.Anotasi TypeInformation gaya Flink diabaikan.
Kait siklus hidup
opendanclosetidak dipanggil.