この Topic では、ユーザー定義関数 (UDF) の作成方法と使用方法について説明します。
ユーザー定義関数 (UDF)
ビルトイン関数が要件を満たさない場合、Flink CDC データインジェストタスクは Java で記述されたカスタム UDF をサポートします。これらの UDF は、ビルトイン関数と同じ方法で呼び出すことができます。
UDF を含む JAR ファイルを、詳細設定で追加の依存関係としてアップロードする必要があります。
Flink CDC UDF の定義
Flink CDC UDF クラスを記述するには、基本依存関係を pom.xml ファイルに追加する必要があります:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${CDC community version}</version>
<scope>provided</scope>
</dependency>次の表を使用して、正しいコミュニティバージョンを選択してください:
リアルタイムコンピューティング VVR エンジンバージョン | 対応するコミュニティバージョン |
11.0 以降 | 3.4.0 |
8.0.11 | 3.3.0 |
8.0.10 以前 | 3.2.1 |
Java クラスは、次の要件を満たす場合に Flink CDC データインジェストタスクで UDF として使用できます:
org.apache.flink.cdc.common.udf.UserDefinedFunctionインターフェイスを実装している。public でパラメーターなしのコンストラクターを持つ。
evalという名前の public メソッドを少なくとも 1 つ含む。
UDF クラスで次のメソッドをオーバーライドして、より詳細なセマンティックコントロールを行うことができます:
getReturnTypeメソッドをオーバーライドして、メソッドの戻り値の型を手動で指定する。openメソッドとcloseメソッドをオーバーライドして、ライフサイクル関数を実装する。
たとえば、次の UDF 定義は、整数パラメーターを 1 ずつインクリメントし、結果を返します。
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// eval 関数の戻り値の型が明確でないため、
// getReturnType を使用して型を明示的に指定する必要があります。
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}型のマッピング
次の表に、eval メソッドのパラメーターと戻り値の Java 型と、getReturnType メソッドによって返されるデータ型とのマッピングを示します。
CDC データ型 | 対応する Java クラス | 備考 |
BOOLEAN | java.lang.Boolean | |
TINYINT | java.lang.Byte | |
SMALLINT | java.lang.Short | |
INTEGER | java.lang.Integer | |
BIGINT | java.lang.Long | |
FLOAT | java.lang.Float | |
DOUBLE | java.lang.Double | |
DECIMAL | java.math.BigDecimal | |
DATE | java.time.LocalDate | |
TIME | java.time.LocalTime | |
TIMESTAMP | java.time.LocalDateTime | |
TIMESTAMP_TZ | java.time.ZonedDateTime | |
TIMESTAMP_LTZ | java.time.Instant | |
CHAR VARCHAR STRING | java.lang.String | |
BINARY VARBINARY BYTES | byte[] | |
ARRAY | java.util.List | ARRAY の要素の型は、List クラスのジェネリックパラメーターにマッピングされます。 |
MAP | java.util.Map | MAP のキーと値の型は、Map クラスのジェネリックパラメーターにマッピングされます。 |
ROW | java.util.List |
Flink CDC UDF の登録
UDF を登録するには、Flink CDC データインジェストタスクの pipeline ブロックに定義を追加します:
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClassUDF を含む JAR ファイルを、詳細設定で追加の依存関係としてアップロードする必要があります。
UDF にはカスタム名を指定できます。名前は UDF クラス名と一致する必要はありません。
Flink CDC UDF の使用
UDF を登録した後、ビルトイン関数と同じようにプロジェクションブロックとフィルターブロックで呼び出すことができます。次のコードに例を示します。
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100Flink SQL UDF との互換性
ScalarFunction から継承した Flink UDF も、CDC YAML UDF として直接登録して使用できますが、次の制限があります:
パラメーター化された
ScalarFunctionクラスはサポートされていません。Flink スタイルの TypeInformation アノテーションは無視されます。
openおよびcloseライフサイクルフックは呼び出されません。