このトピックでは、Flink のユーザー定義スカラー関数 (UDSF) を開発、登録、使用する方法について説明します。
定義
ユーザー定義スカラー関数 (UDSF) は、0 個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。これにより、関数が 1 行のデータを処理して単一の出力値を返すという 1 対 1 の関係が作成されます。詳細については、「ユーザー定義関数」をご参照ください。
UDSF の開発
Flink は、サービスを迅速に開発できるよう、ユーザー定義関数 (UDF) の例を提供しています。この例には、UDSF、ユーザー定義集約関数 (UDAF)、およびユーザー定義のテーブル値関数 (UDTF) の実装が含まれています。開発環境は例の中で事前設定されているため、セットアップは不要です。
ローカルマシンに ASI_UDX_Demo の例をダウンロードして解凍します。
説明ASI_UDX_Demo はサードパーティの Web サイトでホストされています。アクセスに失敗したり、遅延が発生したりする場合があります。
解凍後、ASI_UDX-main フォルダが作成されます。このフォルダには、次のものが含まれています。
pom.xml:プロジェクトレベルの設定ファイルです。プロジェクトの Maven 座標、依存関係、開発者向けのルール、バグ管理システム、組織、ライセンスなど、プロジェクトに関連するすべての要素を記述します。
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java:UDSF の Java サンプルコードです。
IntelliJ IDEA で、 をクリックし、解凍した ASI_UDX-main フォルダを選択します。
\ASI_UDX-main\src\main\java\ASI_UDF フォルダをダブルクリックします。必要に応じて ASI_UDF.java ファイルを設定します。
この例では、ASI_UDF.java ファイルは、各入力文字列の `begin` 位置から `end` 位置までの文字を抽出するコードで設定されています。
package ASI_UDF; import org.apache.flink.table.functions.ScalarFunction; public class ASI_UDF extends ScalarFunction { public String eval(String s, Integer begin, Integer end) { return s.substring(begin, end); } }\ASI_UDX-main\ フォルダをダブルクリックします。pom.xml ファイルを設定します。
この例では、pom.xml ファイルは Flink バージョン 1.11 のメイン JAR パッケージの依存関係で設定されています。ご利用のサービスの要件に基づいて、次のように操作します。
ご利用のサービスが他の JAR パッケージに依存していない場合は、pom.xml ファイルを設定する必要はありません。次のステップに進んでください。
ご利用のサービスが他の JAR パッケージに依存している場合は、必要な依存関係情報を pom.xml ファイルに追加します。
Flink バージョン 1.11 のメイン JAR パッケージの依存関係は次のとおりです。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>pom.xml ファイルが含まれているディレクトリで、次のコマンドを実行してプロジェクトをパッケージ化します。
mvn package -Dcheckstyle.skipASI_UDX-1.0-SNAPSHOT.jar パッケージが \ASI_UDX-main\target\ ディレクトリに作成されます。これで UDSF の開発は完了です。
UDSF の登録
UDSF の登録方法の詳細については、「ユーザー定義関数 (UDF) の管理」をご参照ください。
UDSF の使用
UDSF を登録すると、それを使用できるようになります。手順は次のとおりです。
Flink SQL ジョブを開発します。詳細については、「ジョブ開発マップ」をご参照ください。
次のサンプルコードは、`ASI_UDSF_Source` テーブルの `a` フィールドの文字列から、2 番目から 4 番目までの文字を抽出します。
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;ページで、対象のジョブを見つけ、[操作] 列の [開始] をクリックします。
ジョブが開始されると、`ASI_UDSF_Source` テーブルの各行の `a` フィールドの文字列から 2 番目から 4 番目までの文字が `ASI_UDSF_Sink` テーブルに挿入されます。