Realtime Compute for Apache Flink では、Flink SQL ジョブで Python ユーザー定義関数 (UDF) を使用できます。このトピックでは、Python UDF の種類、Python の依存関係の使用方法、およびパフォーマンスチューニングについて説明します。
ユーザー定義関数の種類
種類 | 説明 |
ユーザー定義スカラー関数 (UDSF) | UDSF は、ゼロ個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。入力と出力の間には 1 対 1 の関係があります。1 つのデータ行を読み取り、1 つの出力値を返します。詳細については、「ユーザー定義スカラー関数 (UDSF)」をご参照ください。 |
ユーザー定義集計関数 (UDAF) | UDAF は、複数のレコードを 1 つのレコードに集計します。入力と出力の間には多対 1 の関係があります。複数の入力レコードを 1 つの出力値に集計します。詳細については、「ユーザー定義集計関数 (UDAF)」をご参照ください。 |
ユーザー定義テーブル値関数 (UDTF) | UDTF は、ゼロ個、1 個、または複数のスカラー値を変数長の入力パラメーターとして受け取ります。UDSF に似ていますが、単一の値だけでなく、任意の数の行を出力として返すことができます。返される行は 1 つ以上の列で構成できます。1 回の関数呼び出しで複数の行または列を出力できます。詳細については、「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。 |
Python の依存関係の使用
Realtime Compute for Apache Flink クラスターには、Pandas、NumPy、PyArrow などの一般的な Python パッケージがプリインストールされています。Realtime Compute for Apache Flink にインストールされているサードパーティの Python パッケージのリストについては、「Python ジョブの開発」をご参照ください。プリインストールされている Python パッケージを使用するには、Python 関数にインポートできます。次の例は、その方法を示しています。
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
import numpy as np
return np.percentile(values, percentile)Python UDF で他のサードパーティの Python パッケージを使用することもできます。プリインストールされていないサードパーティの Python パッケージを使用する場合は、Python UDF を登録する際に、依存関係ファイルとしてアップロードする必要があります。詳細については、「ユーザー定義関数 (UDF) の管理」および「Python の依存関係の使用」をご参照ください。
コードのデバッグ
Python UDF にロギングを実装して情報を出力できます。これにより、問題の特定に役立ちます。次の例は、その方法を示しています。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
logging.info("hello world")
return i + j生成されたログは TaskManager ログファイルで確認できます。詳細については、「運用ログの表示」をご参照ください。
パフォーマンスチューニング
リソースの事前ロード
リソースを事前ロードすると、UDF の初期化中にリソースをロードできます。これにより、`eval` メソッドが実行されるたびにリソースを再読み込みする必要がなくなります。たとえば、大規模なディープラーニングモデルを一度ロードし、その後複数回バッチ予測を実行できます。次のコードは例を示しています。
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
class Predict(ScalarFunction):
def open(self, function_context):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")Python データファイルのアップロード方法については、「Python の依存関係の使用」をご参照ください。
Pandas ライブラリの使用
通常の Python UDF に加えて、Realtime Compute for Apache Flink は Pandas UDF もサポートしています。Pandas UDF の入力データ型は、`pandas.Series` や `pandas.DataFrame` など、Pandas で定義されたデータ構造です。Pandas UDF で Pandas や NumPy などの高性能な Python ライブラリを使用して、高性能な Python UDF を作成できます。詳細については、「ベクトル化されたユーザー定義関数」をご参照ください。
パラメーターの設定
Python UDF のパフォーマンスは、その実装に大きく依存します。パフォーマンスの問題が発生した場合は、UDF の実装を最適化してください。Python UDF のパフォーマンスは、次のパラメーターの値にも影響されます。
パラメーター | 説明 |
python.fn-execution.bundle.size | Python UDF は非同期で計算されます。実行中、Java オペレーターはデータを Python プロセスに送信して非同期処理を行います。データを送信する前に、Java オペレーターはデータをキャッシュします。キャッシュが特定のしきい値に達すると、データは Python プロセスに送信されます。`python.fn-execution.bundle.size` パラメーターは、キャッシュできるデータレコードの最大数を制御します。 デフォルト値は 100000 レコードです。 |
python.fn-execution.bundle.time | このパラメーターは、データの最大キャッシュ時間を制御します。キャッシュデータの計算は、キャッシュされたレコードの数が `python.fn-execution.bundle.size` で定義されたしきい値に達するか、キャッシュ時間が `python.fn-execution.bundle.time` で定義されたしきい値に達するとトリガーされます。 デフォルト値は 1000 ミリ秒です。 |
python.fn-execution.arrow.batch.size | Pandas UDF を使用する場合、このパラメーターは Arrow バッチに含めることができるデータレコードの最大数を指定します。デフォルト値は 10000 です。 説明 `python.fn-execution.arrow.batch.size` パラメーターの値は、`python.fn-execution.bundle.size` パラメーターの値よりも大きくすることはできません。 |
これら 3 つのパラメーターを可能な限り最大値に設定することが、常に最善のアプローチとは限りません。これらのパラメーターが過度に大きな値に設定されている場合、チェックポイント中に処理する必要があるデータが多すぎる可能性があります。これにより、チェックポイントの持続時間が増加したり、チェックポイントの失敗を引き起こしたりする可能性があります。これらのパラメーターの詳細については、「設定」をご参照ください。
参照
UDF の登録、更新、および削除方法については、「ユーザー定義関数 (UDF) の管理」をご参照ください。
Python UDF の開発と使用例については、「ユーザー定義集計関数 (UDAF)」、「ユーザー定義スカラー関数 (UDSF)」、および「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。
Flink Python ジョブでのカスタム Python 仮想環境、サードパーティの Python パッケージ、JAR パッケージ、およびデータファイルの使用方法については、「Python の依存関係の使用」をご参照ください。
Java UDF の開発と使用例については、「ユーザー定義集計関数 (UDAF)」、「ユーザー定義スカラー関数 (UDSF)」、および「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。
Java UDF のデバッグとチューニング方法については、「概要」をご参照ください。