このトピックでは、Realtime Compute for Apache Flink でユーザー定義テーブル値関数(UDTF)を作成、登録、および使用する方法について説明します。
定義
UDTF は、入力パラメーターとしてゼロ、1 つ、または複数のスカラー値を受け取ります。 これらのパラメーターは可変長パラメーターにすることができます。 UDTF は、単一の値ではなく任意の数の行を返すことができる点を除いて、ユーザー定義スカラー関数(UDSF)に似ています。 返される行は、1 つ以上の列で構成されます。 UDTF が呼び出されるたびに、複数の行または列が返されます。 詳細については、「ユーザー定義関数」をご参照ください。
UDTF の作成
Realtime Compute for Apache Flink は、ビジネス開発を促進するために、ユーザー定義関数(UDF)の例を提供しています。 この例には、UDSF、ユーザー定義集計関数(UDAF)、およびユーザー定義テーブル値関数(UDTF)の実装方法が含まれています。 関連バージョンの開発環境は、各例で構成されています。
ASI_UDX_Demo をダウンロードして、オンプレミスのマシンに解凍します。
説明ASI_UDX_Demo はサードパーティの Web サイトで提供されています。 Web サイトにアクセスすると、Web サイトへのアクセスに失敗したり、Web サイトへのアクセスが遅延したりする可能性があります。
パッケージを解凍すると、ASI_UDX-main フォルダーが生成されます。 パスのパラメーター:
pom.xml: プロジェクトレベルの構成ファイル。プロジェクトの Maven 座標、依存関係、開発者が従うべきルール、欠陥管理システム、組織、ライセンス、およびその他すべてのプロジェクト関連要素について記述します。
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDTF.java: サンプル UDTF の Java コード。
IntelliJ IDEA を開き、 を選択します。 抽出した ASI_UDX-main フォルダーを選択し、[OK] をクリックします。
\ASI_UDX-main\src\main\java\ASI_UDTF ディレクトリにある ASI_UDTF.java ファイルをダブルクリックし、ビジネス要件に基づいてファイル内で構成を行います。
この例では、ASI_UDTF.java は、行内の文字列を縦棒(|)で区切られた複数の文字列の列に分割するコードで構成されています。
package ASI_UDTF; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.functions.TableFunction; public class ASI_UDTF extends TableFunction<Tuple2<String,String>> { // 文字列を分割して、名前と場所を抽出する public void eval(String str){ String[] split = str.split("\\|"); String name = split[0]; String place = split[1]; Tuple2<String,String> tuple2 = Tuple2.of(name,place); collect(tuple2); } }UDTF でサポートされているデータ型と型推論メカニズムの詳細については、「データ型」および「型推論」をご参照ください。
説明上記の 2 つのドキュメントは、Apache Flink 1.15 のドキュメントから参照されています。 UDTF でサポートされているデータ型と型推論メカニズムは、Apache Flink のメジャーバージョンによって異なる場合があります。 特定のバージョンの Apache Flink で UDTF によってサポートされているデータ型と型推論メカニズムの詳細については、Ververica Runtime(VVR)と Apache Flink バージョンのマッピングに基づいて、Apache Flink バージョンのドキュメントを参照してください。 Apache Flink のエンジンバージョンを表示する方法の詳細については、「コンソール操作」をご参照ください。
次の UDTF のサンプルは、一般的な複合型 Tuple と Row の値を返します。
Tuple 型
TableFunction<Tuple2<String,Integer>Row 型
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) // 文字列を単語に分割し、各単語とその長さを出力する public static class SplitFunction extends TableFunction<Row> { public void eval(String str) { for (String s : str.split(" ")) { // collect(...) を使用して行を出力する collect(Row.of(s, s.length())); } } }
\ASI_UDX-main\ ディレクトリにある pom.xml ファイルをダブルクリックし、ファイル内で構成を行います。
この例では、pom.xml は Apache Flink 1.11 のメイン JAR 依存関係の情報で構成されています。 ビジネス要件に基づいて、次のいずれかの操作を実行します。
ビジネスが他の JAR パッケージに依存していない場合は、pom.xml ファイルを構成せずに次の手順に進みます。
ビジネスが他の JAR パッケージに依存している場合は、必要な JAR パッケージの情報を pom.xml ファイルに追加します。
Apache Flink 1.11 は、次の JAR パッケージに依存しています。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>pom.xml ファイルが格納されているディレクトリに移動します。 次に、次のコマンドを実行してファイルをパッケージ化します。
mvn package -Dcheckstyle.skip\ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが表示されたら、UDTF が作成されています。
UDTF の登録
UDTF を登録する方法の詳細については、「UDF の管理」をご参照ください。
UDTF の使用
UDTF を登録した後、次の手順を実行して UDTF を使用できます。
Flink SQL を使用してデプロイメントを作成します。 詳細については、「SQL ドラフトの開発」をご参照ください。
ASI_UDTF_Source テーブルでは、message フィールドの各行の文字列は縦棒(|)で複数の列に分割されます。 次のコードは例を示しています。
CREATE TEMPORARY TABLE ASI_UDTF_Source ( `message` VARCHAR ) WITH ( 'connector'='datagen' ); CREATE TEMPORARY TABLE ASI_UDTF_Sink ( name VARCHAR, place VARCHAR ) WITH ( 'connector' = 'blackhole' ); -- ASI_UDTF を使用して message フィールドを分割し、結果を ASI_UDTF_Sink テーブルに挿入する INSERT INTO ASI_UDTF_Sink SELECT name,place FROM ASI_UDTF_Source,lateral table(ASI_UDTF(`message`)) as T(name,place);フルマネージド Flink のコンソールの ページで、開始するデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。
デプロイメントが開始されると、ASI_UDTF_Source テーブルの message フィールドの複数の文字列の列が ASI_UDTF_Sink テーブルに挿入されます。 これらの文字列の列は縦棒(|)で区切られます。