すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:UDAF

最終更新日:Jan 07, 2025

このトピックでは、Realtime Compute for Apache Flink でユーザー定義集計関数(UDAF)を作成、登録、および使用する方法について説明します。

定義

UDAF は、複数の値を 1 つの値に集計します。UDAF の入力と出力の間には多対 1 のマッピングが確立されます。複数の入力値が集計されて 1 つの出力値が生成されます。

制限

Realtime Compute for Apache Flink によって提供されるサービスは、デプロイ環境とネットワーク環境の影響を受けます。そのため、Realtime Compute for Apache Flink で Python UDF を開発する場合は、次の制限事項に注意してください。

  • Apache Flink 1.12 以降のみがサポートされています。

  • Python 3.7.9 は Realtime Compute for Apache Flink ワークスペースにプリインストールされています。そのため、Python 3.7.9 でコードを開発する必要があります。

  • Realtime Compute for Apache Flink のランタイム環境では、JDK 8 と JDK 11 がサポートされています。Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが JDK 8 または JDK 11 と互換性があることを確認してください。

  • オープンソースの Scala 2.11 のみがサポートされています。Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが Scala 2.11 と互換性があることを確認してください。

UDAF の作成

説明

Flink は、UDX を開発するための Python ユーザー定義拡張機能(UDX)のサンプルコードを提供しています。サンプルコードには、Python UDF、Python ユーザー定義集計関数(UDAF)、および Python UDTF の実装が含まれています。このトピックでは、Windows オペレーティングシステムで UDAF を作成する方法について説明します。

  1. python_demo-master パッケージをダウンロードして、オンプレミスマシンに解凍します。

  2. PyCharm のメインメニューバーで、[ファイル] > [開く] を選択して、解凍した python_demo-master パッケージを開きます。

  3. \python_demo-master\udx ディレクトリにある udfs.py ファイルをダブルクリックします。次に、ビジネス要件に基づいてファイルの内容を変更します。

    この例では、weighted_avg は現在のデータと履歴データの加重平均のコードを定義しています。

    from pyflink.common import Row
    from pyflink.table import AggregateFunction, DataTypes
    from pyflink.table.udf import udaf
    
    
    class WeightedAvg(AggregateFunction):
    
        def create_accumulator(self):
            # Row(合計, 件数)
            return Row(0, 0)
    
        def get_value(self, accumulator: Row) -> float:
            if accumulator[1] == 0:
                return 0
            else:
                return accumulator[0] / accumulator[1]
    
        def accumulate(self, accumulator: Row, value, weight):
            accumulator[0] += value * weight
            accumulator[1] += weight
    
        def retract(self, accumulator: Row, value, weight):
            accumulator[0] -= value * weight
            accumulator[1] -= weight
    
    
    weighted_avg = udaf(f=WeightedAvg(),
                        result_type=DataTypes.DOUBLE(),
                        accumulator_type=DataTypes.ROW([
                            DataTypes.FIELD("f0", DataTypes.BIGINT()),
                            DataTypes.FIELD("f1", DataTypes.BIGINT())]))
  4. udx フォルダーが属する \python_demo-master ディレクトリに移動し、次のコマンドを実行してディレクトリ内のファイルをパッケージ化します。

    zip -r python_demo.zip udx

    \python_demo-master\ ディレクトリに python_demo.zip パッケージが表示されたら、UDAF が作成されます。

UDAF の登録

UDAF を登録する方法の詳細については、「UDF の管理」をご参照ください。

UDAF の使用

UDAF を登録した後、次の手順を実行して UDAF を使用できます。

  1. Flink SQL を使用してドラフトを作成します。詳細については、「SQL ドラフトの開発」をご参照ください。

    b フィールドを重みとして使用して、ASI_UDAF_Source テーブルの a フィールドの値を取得します。サンプルコード:

    CREATE TEMPORARY TABLE ASI_UDAF_Source (
      a   BIGINT,
      b   BIGINT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDAF_Sink (
      avg_value  DOUBLE
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDAF_Sink
    SELECT weighted_avg(a, b)
    FROM ASI_UDAF_Source;
  2. Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションペインで、[O&M] > [デプロイメント] を選択します。[デプロイメント] ページで、目的のデプロイメントを見つけて、[アクション] 列の [開始] をクリックします。

    デプロイメントが開始されると、フィールド b を重みとして使用して、ASI_UDAF_Source テーブルの a フィールドの現在データと履歴データの平均が ASI_UDAF_Sink テーブルの各行に挿入されます。