このトピックでは、Realtime Compute for Apache Flink でユーザー定義集約関数 (UDAF) を作成、登録、使用する方法について説明します。
定義
UDAF は、複数の値を単一の値に集約します。UDAF の入力と出力の間には、多対一のマッピングが確立されます。複数の入力値が集約され、1 つの出力値が生成されます。詳細については、「ユーザー定義関数」をご参照ください。
ユーザー定義関数とASI_UDX_Demo はサードパーティの Web サイトで提供されています。これらの Web サイトにアクセスする際に、アクセスに失敗したり、遅延が発生したりする場合があります。
UDAF の作成
Flink は、サービスを迅速に開発するためのユーザー定義関数 (UDF) の例を提供しています。この例には、UDSF、ユーザー定義集約関数 (UDAF)、およびユーザー定義のテーブル値関数 (UDTF) の実装が含まれています。開発環境は事前に構成されているため、セットアップは不要です。
ASI_UDX_Demo をダウンロードし、オンプレミスマシンに解凍します。
パッケージを解凍すると、ASI_UDX-main フォルダが生成されます。このフォルダには、次の項目が含まれています。
pom.xml:プロジェクトの Maven 座標、依存関係、開発者が従うべきルール、不具合管理システム、組織、ライセンスなど、プロジェクトに関連するすべての要素を記述するプロジェクトレベルの構成ファイルです。
\ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java:サンプル UDAF の Java コードです。
IntelliJ IDEA で、 を選択し、解凍した ASI_UDX-main を選択します。
\ASI_UDX-main\ ディレクトリにある pom.xml ファイルをダブルクリックし、必要に応じてファイル内の情報を構成します。
この例では、pom.xml ファイルには、Flink 1.12 のユーザー定義関数を開発するために必要な最小限の依存関係が含まれています。ビジネス要件に応じて:
他の依存関係がない場合は、pom.xml ファイルを構成せずに次のステップに進むことができます。
他の依存関係がある場合は、必要な依存関係を pom.xml ファイルに追加します。
次の例は、Flink 1.12 の最小限の依存関係を示しています。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.12.7</version> <scope>provided</scope> </dependency> </dependencies>説明デプロイメントの Ververica Runtime (VVR) バージョンに対応する Apache Flink のメジャーバージョンに対して、最新のマイナーバージョンを入力することを推奨します。VVR と Apache Flink のバージョンマッピングの詳細については、「概要」をご参照ください。
\ASI_UDX-main\src\main\java\ASI_UDAF をダブルクリックして開き、必要に応じて ASI_UDAF.java を構成します。
この例は、ASI_UDAF.java ファイル内の累積合計のコードを示しています。
package ASI_UDAF; import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; public class ASI_UDAF{ public static class AccSum{ public long sum; } public static class MySum extends AggregateFunction<Long, AccSum>{ @Override public Long getValue(AccSum acSum){ return acSum.sum; } @Override public AccSum createAccumulator(){ AccSum acCount= new AccSum(); acCount.sum=0; return acCount; } public void accumulate(AccSum acc,long num){ acc.sum += num; } /** * アップストリーム演算子によって生成されたメッセージの取り消しをサポートします。 */ public void retract(AccSum acc,long num){ acc.sum -= num; } /** * ローカル-グローバル 2 段階集約最適化をサポートします。 */ public void merge(AccSum acc,Iterable<AccSum> it){ Iterator<AccSum> iter=it.iterator(); while(iter.hasNext()){ AccSum accSum=iter.next(); if(null!=accSum){ acc.sum+=accSum.sum; } } } } }この UDAF は、単純な累積合計操作を実行します。たとえば、同じグループキー (GROUP BY フィールド) の入力値が 1、2、3 の場合、出力は次の 2 つのケースのいずれかになります。
miniBatch が有効になっていない場合、戻り値は 1、3、6 になります。デフォルトでは、miniBatch は有効になっていません。
miniBatch が有効になっている場合、戻り値は 6 になり、中間的な戻り値の数は特定できません。これは、戻り値の数が miniBatch の設定と入力データのディストリビューションによって異なるためです。
説明miniBatch の詳細については、「Flink SQL の最適化」をご参照ください。
pom.xml ファイルが保存されているディレクトリに移動します。次に、次のコマンドを実行してファイルをパッケージ化します。
mvn package -Dcheckstyle.skip\ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが表示された場合、UDAF は正常に作成されています。
UDAF の使用
SQL デプロイメントで UDAF を使用するには、次のいずれかの方法を使用できます。
方法 1: UDAF を登録し、デプロイメントで登録済みの UDAF を使用する。
この方法を使用すると、後続のデプロイメント開発でコードを再利用できます。UDAF の登録方法の詳細については、「UDF の管理」をご参照ください。登録された UDAF の名前が ASI_UDAF$MySum の場合、デプロイメントで次のサンプルコードを使用します。
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT NOT NULL ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO ASI_UDAF_Sink SELECT `ASI_UDAF$MySum`(a) FROM ASI_UDAF_Source;方法 2: Flink の ページで、[その他の設定] の [追加の依存ファイル] オプションを使用してカスタム関数の JAR パッケージをアップロードします。次に、ジョブの SQL 文に一時関数を作成する文を追加し、その関数を使用します。
デプロイメントの [追加の依存関係] フィールドに UDAF の JAR パッケージをアップロードすると、この UDAF はこのデプロイメントでのみ使用できます。この JAR パッケージを他のデプロイメントで使用することはできません。作成する一時関数の名前が mysum の場合、次のコードはデプロイメントで関数を使用する方法を示しています。
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; -- 一時関数 mysum を作成します。 INSERT INTO ASI_UDAF_Sink SELECT `mysum`(a) FROM ASI_UDAF_Source;
SQL ジョブを開発した後、 ページに移動します。対象のジョブを見つけ、[操作] 列の [開始] をクリックします。ジョブが開始されると、ASI_UDAF_Source テーブルの `a` フィールドのデータの累積合計が ASI_UDAF_Sink テーブルに挿入されます。