すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:ユーザー定義集計関数 (UDAF)

最終更新日:Apr 14, 2025

このトピックでは、Realtime Compute for Apache Flink でユーザー定義集計関数 (UDAF) を作成、登録、および使用する方法について説明します。

定義

UDAF は、複数の値を 1 つの値に集計します。UDAF の入力と出力の間には多対 1 のマッピングが確立されます。複数の入力値が集計されて 1 つの出力値が生成されます。詳細については、「ユーザー定義関数」をご参照ください。

説明

ユーザー定義関数 および ASI_UDX_Demo は、サードパーティの Web サイトで提供されています。これらの Web サイトにアクセスすると、アクセスに失敗したり、アクセスが遅延したりする可能性があります。

UDAF の作成

説明

Realtime Compute for Apache Flink は、ビジネス開発を促進するために、ユーザー定義関数 (UDF) の例を提供しています。これらの例には、UDSF、ユーザー定義集計関数 (UDAF)、およびユーザー定義テーブル値関数 (UDTF) の実装方法が含まれています。関連バージョンの開発環境は、各例で構成されています。

  1. ASI_UDX_Demo をダウンロードして、オンプレミスのマシンに解凍します。

    パッケージを解凍すると、ASI_UDX-main フォルダーが生成されます。このフォルダーには、次の項目が含まれています。

    • pom.xml: プロジェクトレベルの構成ファイル。プロジェクトの Maven 座標、依存関係、開発者が従うべきルール、欠陥管理システム、組織、ライセンス、およびその他すべてのプロジェクト関連要素について記述します。

    • \ASI_UDX-main\src\main\java\ASI_UDAF\ASI_UDAF.java: サンプル UDAF の Java コード。

  2. IntelliJ IDEA を開き、[ファイル] > [開く] を選択します。解凍した ASI_UDX-main フォルダーを選択し、[OK] をクリックします。

  3. \ASI_UDX-main\ ディレクトリにある pom.xml ファイルをダブルクリックし、ビジネス要件に基づいてファイル内の情報を構成します。

    この例では、Flink 1.12 で UDF を開発するために必要な最小限の依存関係情報が、pom.xml ファイルに構成されています。ビジネス要件に基づいて、次のいずれかの操作を実行します。

    • ビジネスに依存関係がない場合は、pom.xml ファイルの情報を構成する必要はなく、次の手順に進みます。

    • ビジネスに依存関係がある場合は、必要な依存関係情報を pom.xml ファイルに追加します。

    次の例は、Flink 1.12 の最小限の依存関係を示しています。

     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
             <version>1.12.7</version>
             <scope>provided</scope>
         </dependency>
    </dependencies>
    説明

    デプロイメントの Ververica Runtime (VVR) バージョンに対応する Apache Flink のメジャーバージョンについては、最新のマイナーバージョンを入力することをお勧めします。 VVR と Apache Flink のバージョンマッピングの詳細については、「Java」をご参照ください。

  4. \ASI_UDX-main\src\main\java\ASI_UDAF ディレクトリにある ASI_UDAF.java ファイルをダブルクリックし、ビジネス要件に基づいてファイルにコードを記述します。

    この例では、ASI_UDAF.java ファイルの累積合計のコードを示します。

    package ASI_UDAF;
    
    import org.apache.flink.table.functions.AggregateFunction;
    
    import java.util.Iterator;
    
    public class ASI_UDAF{
        public static class AccSum{
            public long sum;
        }
    
        public static class MySum extends AggregateFunction<Long, AccSum>{
    
            @Override
            public Long getValue(AccSum acSum){
                return acSum.sum;
            }
    
            @Override
            public AccSum createAccumulator(){
                AccSum acCount= new AccSum();
                acCount.sum=0;
                return acCount;
            }
    
            public void accumulate(AccSum acc,long num){
                acc.sum += num;
            }
    
            // 上流オペレーターによって生成されたメッセージの retract をサポートします。
            public void retract(AccSum acc,long num){
                acc.sum -= num;
            }
    
            // ローカルグローバル 2 段階集計最適化をサポートします。
            public void merge(AccSum acc,Iterable<AccSum> it){
                Iterator<AccSum> iter=it.iterator();
                while(iter.hasNext()){
                    AccSum accSum=iter.next();
                    if(null!=accSum){
                        acc.sum+=accSum.sum;
                    }
                }
            }
        }
    }

    この例では、UDAF を使用して単純な累積操作を実行します。たとえば、GROUP BY 句の 3 つの値が 1、2、3 の場合、返される結果は miniBatch が有効になっているかどうかによって異なります。

    • miniBatch が有効になっていない場合、戻り値は 1、3、6 です。デフォルトでは、miniBatch は有効になっていません。

    • miniBatch が有効になっている場合、戻り値は 6 で、中間戻り値の数は決定できません。これは、戻り値の数が miniBatch の設定と入力データの分布によって異なるためです。

    説明

    miniBatch の詳細については、「Flink SQL の最適化」をご参照ください。

  5. pom.xml ファイルが格納されているディレクトリに移動します。次に、次のコマンドを実行してファイルをパッケージ化します。

    mvn package -Dcheckstyle.skip

    \ASI_UDX-main\target\ ディレクトリに ASI_UDX-1.0-SNAPSHOT.jar パッケージが表示されたら、UDAF が作成されています。

UDAF の使用

SQL デプロイメントで UDAF を使用するには、次のいずれかの方法を使用できます。

  • 方法 1: UDAF を登録し、登録した UDAF をデプロイメントで使用します。

    この方法を使用すると、後続のデプロイメント開発でコードを再利用できます。 UDAF の登録方法の詳細については、「UDF の管理」をご参照ください。登録された UDAF の名前が ASI_UDAF$MySum の場合、デプロイメントで次のサンプルコードを使用します。

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a BIGINT NOT NULL
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `ASI_UDAF$MySum`(a)
    FROM ASI_UDAF_Source;
  • 方法 2: [追加の依存関係] フィールドの [詳細設定] タブの [ドラフトエディター] ページで UDAF の JAR パッケージをアップロードします。次に、一時関数の作成に使用するステートメントをデプロイメントの SQL ステートメントに追加し、その関数を使用します。

    デプロイメントの [追加の依存関係] フィールドに UDAF の JAR パッケージをアップロードすると、このデプロイメントでのみ UDAF を使用できます。この JAR パッケージは他のデプロイメントでは使用できません。作成する一時関数の名前が mysum の場合、デプロイメントで関数を使用する方法は次のとおりです。

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      sum  BIGINT
    ) WITH (
      'connector' = 'print'
    );
    
    CREATE TEMPORARY FUNCTION `mysum` AS 'ASI_UDAF.ASI_UDAF$MySum'; -- mysum という名前の一時関数を作成します。
    
    INSERT INTO ASI_UDAF_Sink
    SELECT `mysum`(a)
    FROM ASI_UDAF_Source;
説明

SQL デプロイメントの開発が完了したら、[O&M] > [デプロイメント] ページに移動し、SQL デプロイメントを見つけて、[アクション] 列の [開始] をクリックします。デプロイメントが開始されると、ASI_UDAF_Source テーブルの a フィールドのデータの合計が ASI_UDAF_Sink テーブルに挿入されます。