すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Python

最終更新日:Mar 10, 2026

Realtime Compute for Apache Flink では、Flink SQL ジョブで Python ユーザー定義関数 (UDF) を使用できます。このトピックでは、Python UDF の種類、Python の依存関係の使用方法、およびパフォーマンスチューニングについて説明します。

ユーザー定義関数の種類

種類

説明

ユーザー定義スカラー関数 (UDSF)

UDSF は、ゼロ個、1 個、または複数のスカラー値を新しいスカラー値にマッピングします。入力と出力の間には 1 対 1 の関係があります。1 つのデータ行を読み取り、1 つの出力値を返します。詳細については、「ユーザー定義スカラー関数 (UDSF)」をご参照ください。

ユーザー定義集計関数 (UDAF)

UDAF は、複数のレコードを 1 つのレコードに集計します。入力と出力の間には多対 1 の関係があります。複数の入力レコードを 1 つの出力値に集計します。詳細については、「ユーザー定義集計関数 (UDAF)」をご参照ください。

ユーザー定義テーブル値関数 (UDTF)

UDTF は、ゼロ個、1 個、または複数のスカラー値を変数長の入力パラメーターとして受け取ります。UDSF に似ていますが、単一の値だけでなく、任意の数の行を出力として返すことができます。返される行は 1 つ以上の列で構成できます。1 回の関数呼び出しで複数の行または列を出力できます。詳細については、「ユーザー定義テーブル値関数 (UDTF)」をご参照ください。

Python の依存関係の使用

Realtime Compute for Apache Flink クラスターには、Pandas、NumPy、PyArrow などの一般的な Python パッケージがプリインストールされています。Realtime Compute for Apache Flink にインストールされているサードパーティの Python パッケージのリストについては、「Python ジョブの開発」をご参照ください。プリインストールされている Python パッケージを使用するには、Python 関数にインポートできます。次の例は、その方法を示しています。

@udf(result_type=DataTypes.FLOAT())
def percentile(values: List[float], percentile: float):
    import numpy as np
    return np.percentile(values, percentile)

Python UDF で他のサードパーティの Python パッケージを使用することもできます。プリインストールされていないサードパーティの Python パッケージを使用する場合は、Python UDF を登録する際に、依存関係ファイルとしてアップロードする必要があります。詳細については、「ユーザー定義関数 (UDF) の管理」および「Python の依存関係の使用」をご参照ください。

コードのデバッグ

Python UDF にロギングを実装して情報を出力できます。これにより、問題の特定に役立ちます。次の例は、その方法を示しています。

@udf(result_type=DataTypes.BIGINT())
def add(i, j):    
  logging.info("hello world")    
  return i + j

生成されたログは TaskManager ログファイルで確認できます。詳細については、「運用ログの表示」をご参照ください。

パフォーマンスチューニング

リソースの事前ロード

リソースを事前ロードすると、UDF の初期化中にリソースをロードできます。これにより、`eval` メソッドが実行されるたびにリソースを再読み込みする必要がなくなります。たとえば、大規模なディープラーニングモデルを一度ロードし、その後複数回バッチ予測を実行できます。次のコードは例を示しています。

from pyflink.table import DataTypes
from pyflink.table.udf import ScalarFunction, udf

class Predict(ScalarFunction):
    def open(self, function_context):
        import pickle

        with open("resources.zip/resources/model.pkl", "rb") as f:
            self.model = pickle.load(f)

    def eval(self, x):
        return self.model.predict(x)

predict = udf(Predict(), result_type=DataTypes.DOUBLE(), func_type="pandas")
説明

Python データファイルのアップロード方法については、「Python の依存関係の使用」をご参照ください。

Pandas ライブラリの使用

通常の Python UDF に加えて、Realtime Compute for Apache Flink は Pandas UDF もサポートしています。Pandas UDF の入力データ型は、`pandas.Series` や `pandas.DataFrame` など、Pandas で定義されたデータ構造です。Pandas UDF で Pandas や NumPy などの高性能な Python ライブラリを使用して、高性能な Python UDF を作成できます。詳細については、「ベクトル化されたユーザー定義関数」をご参照ください。

パラメーターの設定

Python UDF のパフォーマンスは、その実装に大きく依存します。パフォーマンスの問題が発生した場合は、UDF の実装を最適化してください。Python UDF のパフォーマンスは、次のパラメーターの値にも影響されます。

パラメーター

説明

python.fn-execution.bundle.size

Python UDF は非同期で計算されます。実行中、Java オペレーターはデータを Python プロセスに送信して非同期処理を行います。データを送信する前に、Java オペレーターはデータをキャッシュします。キャッシュが特定のしきい値に達すると、データは Python プロセスに送信されます。`python.fn-execution.bundle.size` パラメーターは、キャッシュできるデータレコードの最大数を制御します。

デフォルト値は 100000 レコードです。

python.fn-execution.bundle.time

このパラメーターは、データの最大キャッシュ時間を制御します。キャッシュデータの計算は、キャッシュされたレコードの数が `python.fn-execution.bundle.size` で定義されたしきい値に達するか、キャッシュ時間が `python.fn-execution.bundle.time` で定義されたしきい値に達するとトリガーされます。

デフォルト値は 1000 ミリ秒です。

python.fn-execution.arrow.batch.size

Pandas UDF を使用する場合、このパラメーターは Arrow バッチに含めることができるデータレコードの最大数を指定します。デフォルト値は 10000 です。

説明

`python.fn-execution.arrow.batch.size` パラメーターの値は、`python.fn-execution.bundle.size` パラメーターの値よりも大きくすることはできません。

説明

これら 3 つのパラメーターを可能な限り最大値に設定することが、常に最善のアプローチとは限りません。これらのパラメーターが過度に大きな値に設定されている場合、チェックポイント中に処理する必要があるデータが多すぎる可能性があります。これにより、チェックポイントの持続時間が増加したり、チェックポイントの失敗を引き起こしたりする可能性があります。これらのパラメーターの詳細については、「設定」をご参照ください。

参照