Realtime Compute for Apache Flink では、Flink SQL デプロイメントで Python ユーザー定義関数 (UDF) を使用できます。このトピックでは、Python UDF の分類とチューニング方法について説明します。また、Python 依存関係の使用方法についても説明します。
UDF の分類
UDF の分類 | 説明 |
ユーザー定義スカラー関数 (UDSF) | UDSF は、0 個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。UDSF の入力と出力の間には、1 対 1 のマッピングが確立されます。UDSF は、データの 1 行を読み取るたびに、出力値を書き込みます。詳細については、UDSF をご参照ください。 |
ユーザー定義集計関数 (UDAF) | UDAF は、複数の値を 1 つの値に集計します。UDAF の入力と出力の間には、多対 1 のマッピングが確立されます。複数の入力レコードが集計されて、1 つの出力値が生成されます。詳細については、UDAF をご参照ください。 |
ユーザー定義テーブル値関数 (UDTF) | UDTF は、0 個、1 個、または複数のスカラー値を入力パラメーターとして受け取ります。これらのパラメーターは可変長パラメーターにすることができます。UDTF は、単一の値ではなく任意の数の行を返すことができる点を除いて、UDSF と似ています。返される行は、1 つ以上の列で構成されます。UDTF が呼び出されるたびに、複数の行または列が返されます。詳細については、UDTF をご参照ください。 |
Python 依存関係の使用
pandas、NumPy、PyArrow など、一般的に使用される Python パッケージは、Realtime Compute for Apache Flink のワークスペースにプリインストールされています。 Realtime Compute for Apache Flink のワークスペースにプリインストールされているサードパーティ製 Python パッケージの詳細については、Python API ドラフトの開発 をご参照ください。プリインストールされている Python パッケージは、Python UDF にインポートする必要があります。サンプルコード:
@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
# numpy をインポートします
import numpy as np
return np.percentile(values, percentile)Python UDF では、他の種類のサードパーティ製 Python パッケージも使用できます。フルマネージド Flink にプリインストールされていないサードパーティ製 Python パッケージを使用する場合は、そのパッケージを依存関係ファイルとしてフルマネージド Flink にアップロードする必要があります。詳細については、UDF の管理 および Python 依存関係の使用 をご参照ください。
コードのデバッグ
Python UDF のコード実装では、ログ生成メソッドを使用してログを生成し、ログに基づいてエラーを特定できます。次のコードは例です。
@udf(result_type=DataTypes.BIGINT())
def add(i, j):
# ログを生成します
logging.info("hello world")
return i + jログが生成された後、TaskManager のログファイルでログを表示できます。詳細については、デプロイメントの操作ログの表示 をご参照ください。
パフォーマンス チューニング
リソース プリロード機能の使用
リソース プリロード機能を使用すると、UDF の初期化中にリソースを事前にロードできます。このようにして、eval() メソッドを使用してデータを計算するたびにリソースをリロードする必要はありません。たとえば、大きな深層学習モデルを 1 回だけロードし、そのモデルで複数回予測を行う場合は、次のコードを使用できます。
from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf
# 予測クラスを定義します
class Predict(ScalarFunction):
def open(self, function_context):
# pickle をインポートします
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)
predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")Python ファイルのアップロード方法の詳細については、Python 依存関係の使用 をご参照ください。
pandas ライブラリの使用
Realtime Compute for Apache Flink では、一般的に使用される Python UDF に加えて、pandas UDF を使用できます。 pandas UDF の入力データ型は、pandas.Series や pandas.DataFrame など、pandas で定義されているデータ構造を使用します。 pandas UDF では、pandas や NumPy などの高性能 Python ライブラリを使用して、高性能 Python UDF を開発できます。詳細については、ベクトル化ユーザー定義関数 をご参照ください。
必須パラメーターの設定
Python UDF のパフォーマンスは、主にその実装によって異なります。 Python UDF に関連するパフォーマンスの問題が発生した場合は、実装を最適化する必要があります。 Python UDF のパフォーマンスは、次の表で説明するパラメーターの値の影響も受けます。
パラメーター | 説明 |
python.fn-execution.bundle.size | Python UDF は非同期に計算されます。計算中に、Java 演算子はデータを Python プロセスに非同期で送信して処理します。 Java 演算子はデータをキャッシュし、キャッシュ内のデータレコード数が特定のしきい値に達すると、Python プロセスにデータを送信します。キャッシュできるデータレコードの最大数。 デフォルト値:100000。 |
python.fn-execution.bundle.time | データをキャッシュできる最大期間。キャッシュされたデータの数が python.fn-execution.bundle.size パラメーターで指定された値に達した場合、またはデータがキャッシュされている期間が python.fn-execution.bundle.time パラメーターで指定された値に達した場合、キャッシュされたデータの計算がトリガーされます。 デフォルト値:1000。単位:ミリ秒。 |
python.fn-execution.arrow.batch.size | pandas UDF を使用する場合に、arrow バッチで許可されるデータレコードの最大数。デフォルト値:10000。 説明 python.fn-execution.arrow.batch.size パラメーターの値は、python.fn-execution.bundle.size パラメーターの値より大きくすることはできません。 |
これらのパラメーターの値が大きすぎると、チェックポイント中に過剰なデータを処理する必要があります。これにより、チェックポイントが長引いたり、チェックポイントが失敗したりすることさえあります。これらのパラメーターの詳細については、設定 をご参照ください。
参照
UDF の登録、更新、削除方法の詳細については、UDF の管理 をご参照ください。
Flink Python デプロイメントでカスタム Python 仮想環境、サードパーティ製 Python パッケージ、JAR パッケージ、およびデータファイルを使用する方法の詳細については、Python 依存関係の使用 をご参照ください。
Java UDF のデバッグとチューニング方法の詳細については、Java をご参照ください。
UDAF を使用してデータをソートおよび集計する方法の詳細については、UDAF を使用してデータをソートおよび集計する をご参照ください。