すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Python

最終更新日:Apr 14, 2025

Realtime Compute for Apache Flink では、Flink SQL デプロイメントで Python ユーザー定義関数 (UDF) を使用できます。このトピックでは、Python UDF の分類とチューニング方法について説明します。また、Python 依存関係の使用方法についても説明します。

UDF の分類

UDF の分類

説明

ユーザー定義スカラー関数 (UDSF)

UDSF は、0 個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。UDSF の入力と出力の間には、1 対 1 のマッピングが確立されます。UDSF は、データの 1 行を読み取るたびに、出力値を書き込みます。詳細については、UDSF をご参照ください。

ユーザー定義集計関数 (UDAF)

UDAF は、複数の値を 1 つの値に集計します。UDAF の入力と出力の間には、多対 1 のマッピングが確立されます。複数の入力レコードが集計されて、1 つの出力値が生成されます。詳細については、UDAF をご参照ください。

ユーザー定義テーブル値関数 (UDTF)

UDTF は、0 個、1 個、または複数のスカラー値を入力パラメーターとして受け取ります。これらのパラメーターは可変長パラメーターにすることができます。UDTF は、単一の値ではなく任意の数の行を返すことができる点を除いて、UDSF と似ています。返される行は、1 つ以上の列で構成されます。UDTF が呼び出されるたびに、複数の行または列が返されます。詳細については、UDTF をご参照ください。

Python 依存関係の使用

pandas、NumPy、PyArrow など、一般的に使用される Python パッケージは、Realtime Compute for Apache Flink のワークスペースにプリインストールされています。 Realtime Compute for Apache Flink のワークスペースにプリインストールされているサードパーティ製 Python パッケージの詳細については、Python API ドラフトの開発 をご参照ください。プリインストールされている Python パッケージは、Python UDF にインポートする必要があります。サンプルコード:

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    # numpy をインポートします
    import numpy as np
    return np.percentile(values, percentile)

Python UDF では、他の種類のサードパーティ製 Python パッケージも使用できます。フルマネージド Flink にプリインストールされていないサードパーティ製 Python パッケージを使用する場合は、そのパッケージを依存関係ファイルとしてフルマネージド Flink にアップロードする必要があります。詳細については、UDF の管理 および Python 依存関係の使用 をご参照ください。

コードのデバッグ

Python UDF のコード実装では、ログ生成メソッドを使用してログを生成し、ログに基づいてエラーを特定できます。次のコードは例です。

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  # ログを生成します
  logging.info("hello world")    
  return i + j

ログが生成された後、TaskManager のログファイルでログを表示できます。詳細については、デプロイメントの操作ログの表示 をご参照ください。

パフォーマンス チューニング

リソース プリロード機能の使用

リソース プリロード機能を使用すると、UDF の初期化中にリソースを事前にロードできます。このようにして、eval() メソッドを使用してデータを計算するたびにリソースをリロードする必要はありません。たとえば、大きな深層学習モデルを 1 回だけロードし、そのモデルで複数回予測を行う場合は、次のコードを使用できます。

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

# 予測クラスを定義します
class Predict(ScalarFunction):
    def open(self, function_context):
        # pickle をインポートします
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
説明

Python ファイルのアップロード方法の詳細については、Python 依存関係の使用 をご参照ください。

pandas ライブラリの使用

Realtime Compute for Apache Flink では、一般的に使用される Python UDF に加えて、pandas UDF を使用できます。 pandas UDF の入力データ型は、pandas.Series や pandas.DataFrame など、pandas で定義されているデータ構造を使用します。 pandas UDF では、pandas や NumPy などの高性能 Python ライブラリを使用して、高性能 Python UDF を開発できます。詳細については、ベクトル化ユーザー定義関数 をご参照ください。

必須パラメーターの設定

Python UDF のパフォーマンスは、主にその実装によって異なります。 Python UDF に関連するパフォーマンスの問題が発生した場合は、実装を最適化する必要があります。 Python UDF のパフォーマンスは、次の表で説明するパラメーターの値の影響も受けます。

パラメーター

説明

python.fn-execution.bundle.size

Python UDF は非同期に計算されます。計算中に、Java 演算子はデータを Python プロセスに非同期で送信して処理します。 Java 演算子はデータをキャッシュし、キャッシュ内のデータレコード数が特定のしきい値に達すると、Python プロセスにデータを送信します。キャッシュできるデータレコードの最大数。

デフォルト値:100000。

python.fn-execution.bundle.time

データをキャッシュできる最大期間。キャッシュされたデータの数が python.fn-execution.bundle.size パラメーターで指定された値に達した場合、またはデータがキャッシュされている期間が python.fn-execution.bundle.time パラメーターで指定された値に達した場合、キャッシュされたデータの計算がトリガーされます。

デフォルト値:1000。単位:ミリ秒。

python.fn-execution.arrow.batch.size

pandas UDF を使用する場合に、arrow バッチで許可されるデータレコードの最大数。デフォルト値:10000。

説明

python.fn-execution.arrow.batch.size パラメーターの値は、python.fn-execution.bundle.size パラメーターの値より大きくすることはできません。

説明

これらのパラメーターの値が大きすぎると、チェックポイント中に過剰なデータを処理する必要があります。これにより、チェックポイントが長引いたり、チェックポイントが失敗したりすることさえあります。これらのパラメーターの詳細については、設定 をご参照ください。

参照

  • UDF の登録、更新、削除方法の詳細については、UDF の管理 をご参照ください。

  • Python UDF の開発と使用方法の詳細については、UDAFUDSF、および UDTF をご参照ください。

  • Flink Python デプロイメントでカスタム Python 仮想環境、サードパーティ製 Python パッケージ、JAR パッケージ、およびデータファイルを使用する方法の詳細については、Python 依存関係の使用 をご参照ください。

  • Java UDF の開発と使用方法の詳細については、UDAFUDSF、および UDTF をご参照ください。

  • Java UDF のデバッグとチューニング方法の詳細については、Java をご参照ください。

  • UDAF を使用してデータをソートおよび集計する方法の詳細については、UDAF を使用してデータをソートおよび集計する をご参照ください。