Topik ini menjelaskan cara membuat dan menggunakan user-defined functions (UDF).
User-Defined Function (UDF)
Jika fungsi bawaan di Flink CDC tidak memenuhi kebutuhan Anda, Anda dapat membuat UDF Java kustom untuk Pekerjaan Ingesti Data Anda. UDF tersebut dapat dipanggil dengan cara yang sama seperti fungsi bawaan.
Anda harus mengunggah file JAR ke classpath sebagai dependensi tambahan pada tab Configurations di draf pekerjaan Anda.
Define a UDF
Untuk menulis kelas UDF Flink CDC, tambahkan dependensi dasar berikut ke file pom.xml Anda:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${Apache Flink CDC version}</version>
<scope>provided</scope>
</dependency>Pilih nomor versi komunitas dari tabel berikut:
VVR engine version | Corresponding community version number |
11.3 dan versi yang lebih baru | 3.5.0 |
Versi 11.0 hingga 11.2 | 3.4.0 |
Versi 8.0.11 | 3.3.0 |
Versi 8.0.10 dan sebelumnya | 3.2.1 |
Kelas Java harus memenuhi persyaratan berikut agar dapat digunakan sebagai UDF dalam Pekerjaan Ingesti Data Flink CDC:
Kelas tersebut harus mengimplementasikan antarmuka
org.apache.flink.cdc.common.udf.UserDefinedFunction.Kelas tersebut harus memiliki konstruktor publik tanpa parameter.
Kelas tersebut harus berisi setidaknya satu metode publik bernama
eval.
Anda dapat meng-override metode berikut dalam kelas UDF untuk kontrol yang lebih rinci:
Override metode
getReturnTypeuntuk secara eksplisit menentukan tipe nilai kembali.Override metode
opendancloseuntuk mengimplementasikan fungsi siklus hidup.
Sebagai contoh, kode berikut mendefinisikan UDF yang menambahkan parameter integer sebesar 1 dan mengembalikan hasilnya.
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// Tipe nilai kembali dari fungsi eval bersifat ambigu.
// Anda harus secara eksplisit menentukan tipe tersebut melalui getReturnType.
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}Type mappings
Tabel berikut mencantumkan pemetaan antara tipe parameter dan nilai kembali metode eval dalam kelas UDF dan nilai kembali dari metode getReturnType.
CDC Column Type | Java class | Notes |
BOOLEAN | java.lang.Boolean | |
TINYINT | java.lang.Byte | |
SMALLINT | java.lang.Short | |
INTEGER | java.lang.Integer | |
BIGINT | java.lang.Long | |
FLOAT | java.lang.Float | |
DOUBLE | java.lang.Double | |
DECIMAL | java.math.BigDecimal | |
DATE | java.time.LocalDate | |
TIME | java.time.LocalTime | |
TIMESTAMP | java.time.LocalDateTime | |
TIMESTAMP_TZ | java.time.ZonedDateTime | |
TIMESTAMP_LTZ | java.time.Instant | |
CHAR VARCHAR STRING | java.lang.String | |
BINARY VARBINARY BYTES | byte[] | |
ARRAY | java.util.List | Tipe elemen dalam ARRAY dipetakan ke parameter generik kelas List. |
MAP | java.util.Map | Tipe kunci dan nilai MAP dipetakan ke parameter generik kelas Map. |
ROW | java.util.List | |
VARIANT | org.apache.flink.cdc.common.types.variant.Variant | Catatan Perhatikan bahwa ini berbeda dari path kelas Variant di Flink SQL. |
Register a Flink CDC UDF
Untuk mendaftarkan UDF, tambahkan konfigurasi berikut ke bagian pipeline Pekerjaan Ingesti Data Flink CDC Anda:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClassPada bagian More Configurations, Anda harus mengunggah paket JAR untuk classpath ini sebagai dependensi tambahan.
Anda dapat menentukan nama kustom untuk UDF Anda. Nama ini tidak perlu sesuai dengan nama kelas UDF.
Use a Flink CDC UDF
Setelah UDF terdaftar, Anda dapat memanggilnya dalam ekspresi projection dan filter dengan cara yang sama seperti fungsi bawaan:
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100Compatibility with Flink SQL UDFs
UDF Flink SQL yang memperluas ScalarFunction juga dapat didaftarkan dan digunakan langsung sebagai UDF Flink CDC, dengan batasan berikut:
Kelas
ScalarFunctionyang diparameterisasi tidak didukung.Anotasi TypeInformation gaya Flink diabaikan.
Kait siklus hidup
opendanclosetidak dipanggil.