このトピックでは、Realtime Compute for Apache Flink でユーザー定義集計関数 (UDAF) を作成、登録、および使用する方法について説明します。
定義
UDAF は、複数の値を 1 つの値に集計します。UDAF の入力と出力の間には多対 1 のマッピングが確立されます。複数の入力値が集計されて 1 つの出力値が生成されます。詳細については、「ユーザー定義関数」をご参照ください。
ユーザー定義関数 および ASI_UDX_Demo は、サードパーティの Web サイトで提供されています。これらの Web サイトにアクセスすると、アクセスに失敗したり、アクセスが遅延したりする可能性があります。
UDAF の作成
Realtime Compute for Apache Flink は、ビジネス開発を促進するために、ユーザー定義関数 (UDF) の例を提供しています。これらの例には、UDSF、ユーザー定義集計関数 (UDAF)、およびユーザー定義テーブル値関数 (UDTF) の実装方法が含まれています。関連バージョンの開発環境は、各例で構成されています。
ASI_UDX_Demo をダウンロードして、オンプレミスのマシンに解凍します。
パッケージを解凍すると、ASI_UDX-main フォルダーが生成されます。このフォルダーには、次の項目が含まれています。
pom.xml: プロジェクトレベルの構成ファイル。プロジェクトの Maven 座標、依存関係、開発者が従うべきルール、欠陥管理システム、組織、ライセンス、およびその他すべてのプロジェクト関連要素について記述します。
\ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java: サンプル UDAF の Java コード。
IntelliJ IDEA を開き、ASI_UDX-main フォルダーを選択し、[OK] をクリックします。
を選択します。解凍した\ASI_UDX-main\ ディレクトリにある pom.xml ファイルをダブルクリックし、ビジネス要件に基づいてファイル内の情報を構成します。
この例では、Flink 1.12 で UDF を開発するために必要な最小限の依存関係情報が、pom.xml ファイルに構成されています。ビジネス要件に基づいて、次のいずれかの操作を実行します。
ビジネスに依存関係がない場合は、pom.xml ファイルの情報を構成する必要はなく、次の手順に進みます。
ビジネスに依存関係がある場合は、必要な依存関係情報を pom.xml ファイルに追加します。
次の例は、Flink 1.12 の最小限の依存関係を示しています。
<dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.12.7</version> <scope>provided</scope> </dependency> </dependencies>
説明デプロイメントの Ververica Runtime (VVR) バージョンに対応する Apache Flink のメジャーバージョンについては、最新のマイナーバージョンを入力することをお勧めします。 VVR と Apache Flink のバージョンマッピングの詳細については、「Java」をご参照ください。
\ASI_UDX-main\src\main\java\ASI_UDAF ディレクトリにある ASI_UDAF.java ファイルをダブルクリックし、ビジネス要件に基づいてファイルにコードを記述します。
この例では、ASI_UDAF.java ファイルの累積合計のコードを示します。
package ASI_UDAF; import org.apache.flink.table.functions.AggregateFunction; import java.util.Iterator; public class ASI_UDAF{ public static class AccSum{ public long sum; } public static class MySum extends AggregateFunction<Long, AccSum>{ @Override public Long getValue(AccSum acSum){ return acSum.sum; } @Override public AccSum createAccumulator(){ AccSum acCount= new AccSum(); acCount.sum=0; return acCount; } public void accumulate(AccSum acc,long num){ acc.sum += num; } // 上流オペレーターによって生成されたメッセージの retract をサポートします。 public void retract(AccSum acc,long num){ acc.sum -= num; } // ローカルグローバル 2 段階集計最適化をサポートします。 public void merge(AccSum acc,Iterable<AccSum> it){ Iterator<AccSum> iter=it.iterator(); while(iter.hasNext()){ AccSum accSum=iter.next(); if(null!=accSum){ acc.sum+=accSum.sum; } } } } }
この例では、UDAF を使用して単純な累積操作を実行します。たとえば、GROUP BY 句の 3 つの値が 1、2、3 の場合、返される結果は miniBatch が有効になっているかどうかによって異なります。
miniBatch が有効になっていない場合、戻り値は 1、3、6 です。デフォルトでは、miniBatch は有効になっていません。
miniBatch が有効になっている場合、戻り値は 6 で、中間戻り値の数は決定できません。これは、戻り値の数が miniBatch の設定と入力データの分布によって異なるためです。
説明miniBatch の詳細については、「Flink SQL の最適化」をご参照ください。
pom.xml ファイルが格納されているディレクトリに移動します。次に、次のコマンドを実行してファイルをパッケージ化します。
mvn package -Dcheckstyle.skip
\ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが表示されたら、UDAF が作成されています。
UDAF の使用
SQL デプロイメントで UDAF を使用するには、次のいずれかの方法を使用できます。
方法 1: UDAF を登録し、登録した UDAF をデプロイメントで使用します。
この方法を使用すると、後続のデプロイメント開発でコードを再利用できます。 UDAF の登録方法の詳細については、「UDF の管理」をご参照ください。登録された UDAF の名前が ASI_UDAF$MySum の場合、デプロイメントで次のサンプルコードを使用します。
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT NOT NULL ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); INSERT INTO ASI_UDAF_Sink SELECT `ASI_UDAF$MySum`(a) FROM ASI_UDAF_Source;
方法 2: [追加の依存関係] フィールドの [詳細設定] タブの [ドラフトエディター] ページで UDAF の JAR パッケージをアップロードします。次に、一時関数の作成に使用するステートメントをデプロイメントの SQL ステートメントに追加し、その関数を使用します。
デプロイメントの [追加の依存関係] フィールドに UDAF の JAR パッケージをアップロードすると、このデプロイメントでのみ UDAF を使用できます。この JAR パッケージは他のデプロイメントでは使用できません。作成する一時関数の名前が mysum の場合、デプロイメントで関数を使用する方法は次のとおりです。
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( sum BIGINT ) WITH ( 'connector' = 'print' ); CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; -- mysum という名前の一時関数を作成します。 INSERT INTO ASI_UDAF_Sink SELECT `mysum`(a) FROM ASI_UDAF_Source;
SQL デプロイメントの開発が完了したら、
ページに移動し、SQL デプロイメントを見つけて、[アクション] 列の [開始] をクリックします。デプロイメントが開始されると、ASI_UDAF_Source テーブルの a フィールドのデータの合計が ASI_UDAF_Sink テーブルに挿入されます。