すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:UDTF

最終更新日:Jan 23, 2025

このトピックでは、Realtime Compute for Apache Flink で Python ユーザー定義のテーブル値関数 (UDTF) を作成、登録、および使用する方法について説明します。

説明

UDTF は、入力パラメーターとして 0 個、1 個、または複数のスカラー値を受け取ります。 これらのパラメーターは可変長パラメーターにすることができます。 UDTF は、単一の値ではなく、任意の数の行を返すことができます。 返される行には、1 つ以上の列を含めることができます。 UDTF が呼び出されるたびに、複数の行または列が返されます。 UDTF はユーザー定義スカラー関数 (UDF) に似ていますが、返す結果が異なります。

制限

Realtime Compute for Apache Flink によって提供されるサービスは、デプロイ環境とネットワーク環境の影響を受けます。 そのため、Realtime Compute for Apache Flink で Python UDF を開発する際は、以下の制限事項に注意してください。

  • Apache Flink 1.12 以降のみがサポートされています。

  • Python は Realtime Compute for Apache Flink ワークスペースにプリインストールされています。 そのため、必要なバージョンの Python でコードを開発する必要があります。

    説明

    バージョン 8.0.11 より前の Ververica Runtime (VVR) を使用する Realtime Compute for Apache Flink には、Python 3.7.9 がプリインストールされています。 VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink には、Python 3.9.21 がプリインストールされています。 VVR バージョンを 8.0.11 以降にアップグレードした後、以前の VVR バージョンの PyFlink ドラフトを再度テスト、デプロイ、および実行する必要があります。

  • Realtime Compute for Apache Flink のランタイム環境では、JDK 8 と JDK 11 がサポートされています。 Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが JDK 8 または JDK 11 と互換性があることを確認してください。

  • オープンソースの Scala 2.11 のみがサポートされています。 Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが Scala 2.11 と互換性があることを確認してください。

UDTF の作成

説明

Flink は、UDX を開発するための Python ユーザー定義拡張 (UDX) のサンプルコードを提供しています。 サンプルコードには、Python UDF、Python ユーザー定義集計関数 (UDAF)、および Python UDTF の実装が含まれています。 このセクションでは、Windows オペレーティングシステムで UDTF を作成する方法について説明します。

  1. python_demo-master パッケージをダウンロードして、オンプレミスマシンに解凍します。

  2. PyCharm のメインメニューバーで、[ファイル] > [開く] を選択して、解凍した python_demo-master パッケージを開きます。

  3. \python_demo-master\udx ディレクトリにある udtfs.py ファイルをダブルクリックします。 次に、ビジネス要件に基づいてファイルの内容を変更します。

    この例では、split は、文字列の行を縦棒 (|) で区切られた文字列の複数の列に分割できるコードを定義しています。

    from pyflink.table import DataTypes
    from pyflink.table.udf import udtf
    
    @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
    def split(s: str):
        splits = s.split("|")
        yield splits[0], splits[1]
  4. udx フォルダーが属する \python_demo ディレクトリに移動し、次のコマンドを実行してディレクトリ内のファイルをパッケージ化します。

    zip -r python_demo.zip udx

    \python_demo\ ディレクトリに python_demo.zip パッケージが表示されたら、UDTF が開発されています。

UDTF の登録

UDTF を登録する方法の詳細については、「UDF の管理」をご参照ください。

UDTF の使用

UDTF を登録した後、次の手順を実行して UDTF を使用できます。

  1. Flink SQL を使用してドラフトを作成します。 詳細については、「SQL ドラフトの開発」をご参照ください。

    ASI_UDTF_Source テーブルの各行の文字列にある aa 文字列と message フィールドを縦棒 (|) で連結した後、連結された文字列を縦棒 (|) で文字列の複数の列に分割します。 次のコードは例を示しています。

    CREATE TEMPORARY TABLE ASI_UDTF_Source (
      `message`  VARCHAR
    ) WITH (
      'connector'='datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDTF_Sink (
      name  VARCHAR,
      place  VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDTF_Sink
    SELECT name,place
    FROM ASI_UDTF_Source,lateral table(split(concat_ws('|', `message`, 'aa'))) as T(name,place);
  2. Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。 [デプロイメント] ページで、目的のデプロイメントを見つけて、[アクション] 列の [開始] をクリックします。

    デプロイメントが開始されると、ASI_UDTF_Sink テーブルに 2 つの列のデータが挿入されます。 2 つの列のデータには、縦棒 (|) で区切られた連結文字列が含まれています。