このトピックでは、ユーザー定義関数 (UDF) の作成方法と使用方法について説明します。
ユーザー定義関数 (UDF)
Flink CDC のビルトイン関数が要件を満たさない場合は、データインジェスト ジョブ用にカスタム Java UDF を作成できます。これらの UDF は、ビルトイン関数と同じ方法で呼び出すことができます。
指定されたクラスパスの JAR ファイルは、ジョブドラフトの [構成] タブで追加の依存関係としてアップロードする必要があります。
UDF の定義
Flink CDC UDF クラスを作成するには、pom.xml ファイルにベース依存関係を追加します。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${Apache Flink CDC version}</version>
<scope>provided</scope>
</dependency>次の表からコミュニティ バージョン番号を選択します。
VVR エンジン バージョン | 対応するコミュニティ バージョン番号 |
11.3 以降のバージョン | 3.5.0 |
バージョン 11.0 から 11.2 | 3.4.0 |
バージョン 8.0.11 | 3.3.0 |
バージョン 8.0.10 以前 | 3.2.1 |
Flink CDC データインジェスト ジョブで UDF として使用するには、Java クラスが次の要件を満たしている必要があります。
org.apache.flink.cdc.common.udf.UserDefinedFunctionインターフェイスを実装する必要があります。パブリックな引数なしのコンストラクターが必要です。
evalという名前の public メソッドを少なくとも1つ含む必要があります。
より詳細な制御のために、UDF クラスで次のメソッドをオーバーライドできます。
getReturnTypeメソッドをオーバーライドして、戻り値の型を明示的に指定します。openとcloseメソッドをオーバーライドして、ライフサイクル関数を実装します。
たとえば、次のコードは、整数パラメーターを 1 ずつインクリメントし、結果を返す UDF を定義します。
public class AddOneFunctionClass implements UserDefinedFunction {
public Object eval(Integer num) {
return num + 1;
}
@Override
public DataType getReturnType() {
// The return type of the eval function is ambiguous.
// You must explicitly specify the type with getReturnType.
return DataTypes.INT();
}
@Override
public void open() throws Exception {
// ...
}
@Override
public void close() throws Exception {
// ...
}
}型マッピング
以下の表では、UDF クラスの eval メソッドのパラメーターおよび戻り値の型と、getReturnType メソッドの戻り値との間のマッピングを示します。
CDC カラム型 | Java クラス | 備考 |
BOOLEAN | java.lang.Boolean | |
TINYINT | java.lang.Byte | |
SMALLINT | java.lang.Short | |
INTEGER | java.lang.Integer | |
BIGINT | java.lang.Long | |
FLOAT | java.lang.Float | |
DOUBLE | java.lang.Double | |
DECIMAL | java.math.BigDecimal | |
DATE | java.time.LocalDate | |
TIME | java.time.LocalTime | |
TIMESTAMP | java.time.LocalDateTime | |
TIMESTAMP_TZ | java.time.ZonedDateTime | |
TIMESTAMP_LTZ | java.time.Instant | |
CHAR VARCHAR STRING | java.lang.String | |
BINARY VARBINARY BYTES | byte[] | |
ARRAY | java.util.List | ARRAY 内の要素の型は、List クラスの総称パラメーターにマッピングされます。 |
MAP | java.util.Map | MAP のキーと値の型は、Map クラスの総称パラメーターにマッピングされます。 |
ROW | java.util.List | |
VARIANT | org.apache.flink.cdc.common.types.variant.Variant | 説明 これは Flink SQL の Variant クラスパスとは異なることに注意してください。 |
Flink CDC UDF の登録
UDF を登録するには、お使いの Flink CDC データインジェストジョブの pipeline セクションに以下の構成を追加します。
pipeline:
user-defined-function:
- name: inc
classpath: org.apache.flink.cdc.udf.examples.java.AddOneFunctionClass
- name: format
classpath: org.apache.flink.cdc.udf.examples.java.FormatFunctionClass[その他の構成] セクションで、このクラスパスの JAR パッケージを追加の依存関係としてアップロードする必要があります。
UDF にカスタム名を指定できます。この名前は UDF クラス名と一致する必要はありません。
Flink CDC UDF の使用
UDF を登録すると、ビルトイン関数と同じ方法で、プロジェクションおよびフィルター式で UDF を呼び出すことができます。
transform:
- source-table: db.\.*
projection: "*, inc(inc(inc(id))) as inc_id, format(id, 'id -> %d') as formatted_id"
filter: inc(id) < 100Flink SQL UDF との互換性
「ScalarFunction」を拡張する Flink SQL UDF は、以下の制限事項があるものの、Flink CDC UDF として直接登録および使用可能です。
パラメータ化された
ScalarFunctionクラスはサポートされていません。Flink スタイルの `TypeInformation` アノテーションは無視されます。
openおよびcloseのライフサイクルフックは呼び出されません。