このトピックでは、Flink 用のユーザー定義のテーブル値関数 (UDTF) を開発、登録、および使用する方法について説明します。
定義
ユーザー定義のテーブル値関数 (UDTF) は、可変長の 0 個以上のスカラー値を入力パラメーターとして受け入れます。単一の値を返すユーザー定義スカラー関数とは異なり、UDTF は複数の行を出力として返すことができます。返される行は、1 つ以上の列で構成できます。詳細については、「ユーザー定義関数」をご参照ください。
UDTF の開発
Flink は、サービスを迅速に開発するのに役立つユーザー定義関数 (UDF) の例を提供しています。これらの例には、UDSF、ユーザー定義集約関数 (UDAF)、およびユーザー定義のテーブル値関数 (UDTF) の実装が含まれています。開発環境は例で事前設定済みであるため、セットアップは不要です。
ASI_UDX_Demo サンプルプロジェクトをダウンロードして、ローカルマシンに解凍します。
説明サードパーティのウェブサイト ASI_UDX_Demo へのアクセスは、失敗するか、遅延する可能性があります。
展開が完了すると、ASI_UDX-main フォルダが作成されます。このフォルダには、次のファイルが含まれています。
pom.xml: プロジェクトレベルの構成ファイルです。このファイルには、プロジェクトの Maven 座標、依存関係、開発者ルール、バグ管理システム、組織、ライセンス、およびその他のプロジェクト関連情報が記述されています。
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: UDTF のサンプル Java コードです。
IntelliJ IDEA で、 をクリックし、解凍した ASI_UDX-main フォルダを開きます。
\ASI_UDX-main\src\main\java\ASI_UDTF フォルダをダブルクリックし、必要に応じて ASI_UDTF.java ファイルを変更します。
この例では、ASI_UDTF.java ファイル内のコードは、縦棒 (|) デリミタに基づいて文字列を複数の文字列列に分割します。
package ASI_UDTF; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class ASI_UDTF extends TableFunction<Tuple2<String,String>> { public void eval(String str){ String[] split = str.split("\\|"); String name = split[0]; String place = split[1]; Tuple2<String,String> tuple2 = Tuple2.of(name,place); collect(tuple2); } }TableFunction がサポートするデータの型と型推論メカニズムの詳細については、「Flink のデータの型」および「型推論」をご参照ください。
説明上記のリンクは、Flink 1.15 ドキュメントのものです。サポートされるデータの型と型推論メカニズムは、主要な Flink バージョン間で異なる場合があります。VVR-to-Flink バージョンマッピングに基づいて、ご利用の Ververica Runtime (VVR) バージョンに対応する Flink ドキュメントを確認してください。Flink バージョンを表示するには、「ワークスペースと名前空間の FAQ」をご参照ください。
次の例は、一般的な複合型であるタプルと Row の使用方法を示しています。
タプル型
TableFunction<Tuple2<String,Integer>Row 型
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // collect(...) を使用して行を出力します collect(Row.of(s, s.length())); } } }
\ASI_UDX-main\ フォルダをダブルクリックします。pom.xml ファイルを構成します。
この例では、pom.xml ファイルは Flink バージョン 1.11 の主要な JAR パッケージの依存関係で構成されています。サービス要件に基づいて:
ご利用のサービスが他の JAR パッケージに依存しない場合、pom.xml ファイルを構成する必要はありません。次のステップに進んでください。
ご利用のサービスが他の JAR パッケージに依存する場合、必要な依存関係情報を pom.xml ファイルに追加してください。
Flink バージョン 1.11 の主要な JAR パッケージの依存関係は次のとおりです。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>pom.xml ファイルを含むディレクトリで、次のコマンドを実行してプロジェクトをパッケージ化します。
mvn package -Dcheckstyle.skip\ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが生成されると、UDTF の開発が完了します。
UDTF の登録
UDTF の登録方法の詳細については、「ユーザー定義関数 (UDF) の管理」をご参照ください。
UDTF の使用
UDTF を登録した後、次の手順に従ってジョブで使用できます。
Flink SQL ジョブを開発します。詳細については、「ジョブ開発マップ」をご参照ください。
次のサンプルコードは、ASI_UDTF_Source テーブルのメッセージフィールドの各行にある文字列を複数の列に分割します。縦棒 (|) がデリミタとして使用されます。
CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDTF_Sink SELECT name,place FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);「」ページで、対象のジョブを見つけ、[開始] を [操作] 列でクリックします。
ジョブが開始されると、`ASI_UDTF_Source` テーブルの `message` フィールドの文字列は縦棒デリミタで分割されます。結果の文字の列は、`ASI_UDTF_Sink` テーブルに挿入されます。