このトピックでは、Realtime Compute for Apache Flink でユーザー定義スカラー関数(UDSF)を作成、登録、および使用する方法について説明します。
定義
UDSF は、0 個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。 UDSF の入力データと出力データは、1 対 1 の関係でマッピングされます。 UDSF はデータの行を読み取るたびに、出力値を書き込みます。 詳細については、「ユーザー定義関数」をご参照ください。
UDSF の作成
Realtime Compute for Apache Flink は、ビジネス開発を促進するために、ユーザー定義関数(UDF)の例を提供しています。 この例には、UDSF、ユーザー定義集計関数(UDAF)、およびユーザー定義テーブル値関数(UDTF)の実装方法が含まれています。 関連バージョンの開発環境は、各例で構成されています。
ASI_UDX_Demo をダウンロードして、オンプレミスのマシンに解凍します。
説明ASI_UDX_Demo はサードパーティの Web サイトで提供されています。 Web サイトにアクセスすると、Web サイトへのアクセスに失敗したり、Web サイトへのアクセスが遅延したりする可能性があります。
パッケージを解凍すると、ASI_UDX-main フォルダーが生成されます。 パスのパラメーター:
pom.xml: プロジェクトレベルの構成ファイル。プロジェクトの Maven 座標、依存関係、開発者が従うべきルール、欠陥管理システム、組織、ライセンス、およびその他すべてのプロジェクト関連要素について記述します。
\ASI_UDX-main\src\main\java\ASI_UDF\ASI_UDF.java: サンプル UDSF の Java コード。
IntelliJ IDEA を開き、 を選択します。 抽出した ASI_UDX-main フォルダーを選択し、[OK] をクリックします。
\ASI_UDX-main\src\main\java\ASI_UDF ディレクトリにある ASI_UDF.java ファイルをダブルクリックし、ビジネス要件に基づいてファイル内で構成を行います。
この例では、ASI_UDF.java は、各データレコードの開始位置から終了位置までの文字を取得するコードで構成されています。
package ASI_UDF; import org.apache.flink.table.functions.ScalarFunction; public class ASI_UDF extends ScalarFunction { // 各データレコードの開始位置から終了位置までの文字を取得します。 public String eval(String s, Integer begin, Integer end) { return s.substring(begin, end); } }\ASI_UDX-main\ ディレクトリにある pom.xml ファイルをダブルクリックし、ファイル内で構成を行います。
この例では、pom.xml は Apache Flink 1.11 のメイン JAR 依存関係の情報で構成されています。 ビジネス要件に基づいて、次のいずれかの操作を実行します。
ビジネスが他の JAR パッケージに依存していない場合は、pom.xml ファイルを構成する必要はなく、次の手順に進みます。
ビジネスが他の JAR パッケージに依存している場合は、必要な JAR パッケージの情報を pom.xml ファイルに追加します。
Apache Flink 1.11 は、次の JAR パッケージに依存しています。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.11.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table</artifactId> <version>1.11.0</version> <type>pom</type> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.11.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.11.0</version> </dependency> </dependencies>pom.xml ファイルが保存されているディレクトリに移動します。 次に、次のコマンドを実行してファイルをパッケージ化します。
mvn package -Dcheckstyle.skip\ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが表示されたら、UDSF が作成されています。
UDSF の登録
UDSF を登録する方法の詳細については、「UDF の管理」をご参照ください。
UDSF の使用
UDSF を登録した後、UDSF を使用できます。 UDSF を使用するには、次の手順を実行します。
Flink SQL を使用してデプロイメントを作成します。 詳細については、「SQL ドラフトの開発」をご参照ください。
次のコードは、ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字を取得する方法の例を示しています。
CREATE TEMPORARY TABLE ASI_UDSF_Source ( a VARCHAR, b INT, c INT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDSF_Sink ( a VARCHAR ) WITH ( 'connector' = 'blackhole' ); -- ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字を取得します。 INSERT INTO ASI_UDSF_Sink SELECT ASI_UDSF(a,2,4) FROM ASI_UDSF_Source;フルマネージド Flink のコンソールの ページで、開始するデプロイメントを見つけ、[アクション] 列の [開始] をクリックします。
デプロイメントが開始されると、ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字が、ASI_UDSF_Sink テーブルの各行に挿入されます。