このトピックでは、Realtime Compute for Apache Flink でユーザー定義集計関数(UDAF)を作成、登録、および使用する方法について説明します。
定義
UDAF は、複数の値を 1 つの値に集計します。UDAF の入力と出力の間には多対 1 のマッピングが確立されます。複数の入力値が集計されて 1 つの出力値が生成されます。
制限
Realtime Compute for Apache Flink によって提供されるサービスは、デプロイ環境とネットワーク環境の影響を受けます。そのため、Realtime Compute for Apache Flink で Python UDF を開発する場合は、次の制限事項に注意してください。
Apache Flink 1.12 以降のみがサポートされています。
Python 3.7.9 は Realtime Compute for Apache Flink ワークスペースにプリインストールされています。そのため、Python 3.7.9 でコードを開発する必要があります。
Realtime Compute for Apache Flink のランタイム環境では、JDK 8 と JDK 11 がサポートされています。Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが JDK 8 または JDK 11 と互換性があることを確認してください。
オープンソースの Scala 2.11 のみがサポートされています。Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが Scala 2.11 と互換性があることを確認してください。
UDAF の作成
Flink は、UDX を開発するための Python ユーザー定義拡張機能(UDX)のサンプルコードを提供しています。サンプルコードには、Python UDF、Python ユーザー定義集計関数(UDAF)、および Python UDTF の実装が含まれています。このトピックでは、Windows オペレーティングシステムで UDAF を作成する方法について説明します。
python_demo-master パッケージをダウンロードして、オンプレミスマシンに解凍します。
PyCharm のメインメニューバーで、 を選択して、解凍した python_demo-master パッケージを開きます。
\python_demo-master\udx ディレクトリにある udfs.py ファイルをダブルクリックします。次に、ビジネス要件に基づいてファイルの内容を変更します。
この例では、weighted_avg は現在のデータと履歴データの加重平均のコードを定義しています。
from pyflink.common import Row from pyflink.table import AggregateFunction, DataTypes from pyflink.table.udf import udaf class WeightedAvg(AggregateFunction): def create_accumulator(self): # Row(合計, 件数) return Row(0, 0) def get_value(self, accumulator: Row) -> float: if accumulator[1] == 0: return 0 else: return accumulator[0] / accumulator[1] def accumulate(self, accumulator: Row, value, weight): accumulator[0] += value * weight accumulator[1] += weight def retract(self, accumulator: Row, value, weight): accumulator[0] -= value * weight accumulator[1] -= weight weighted_avg = udaf(f=WeightedAvg(), result_type=DataTypes.DOUBLE(), accumulator_type=DataTypes.ROW([ DataTypes.FIELD("f0", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.BIGINT())]))udx フォルダーが属する \python_demo-master ディレクトリに移動し、次のコマンドを実行してディレクトリ内のファイルをパッケージ化します。
zip -r python_demo.zip udx\python_demo-master\ ディレクトリに python_demo.zip パッケージが表示されたら、UDAF が作成されます。
UDAF の登録
UDAF を登録する方法の詳細については、「UDF の管理」をご参照ください。
UDAF の使用
UDAF を登録した後、次の手順を実行して UDAF を使用できます。
Flink SQL を使用してドラフトを作成します。詳細については、「SQL ドラフトの開発」をご参照ください。
b フィールドを重みとして使用して、ASI_UDAF_Source テーブルの a フィールドの値を取得します。サンプルコード:
CREATE TEMPORARY TABLE ASI_UDAF_Source ( a BIGINT, b BIGINT ) WITH ( 'connector' = 'datagen' ); CREATE TEMPORARY TABLE ASI_UDAF_Sink ( avg_value DOUBLE ) WITH ( 'connector' = 'blackhole' ); INSERT INTO ASI_UDAF_Sink SELECT weighted_avg(a, b) FROM ASI_UDAF_Source;Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションペインで、 を選択します。[デプロイメント] ページで、目的のデプロイメントを見つけて、[アクション] 列の [開始] をクリックします。
デプロイメントが開始されると、フィールド b を重みとして使用して、ASI_UDAF_Source テーブルの a フィールドの現在データと履歴データの平均が ASI_UDAF_Sink テーブルの各行に挿入されます。