すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:UDSF

最終更新日:Jan 17, 2025

このトピックでは、Python ユーザー定義スカラ関数(UDSF)を作成、登録、および使用する方法について説明します。

定義

UDSF は、ゼロ、1 つ、または複数のスカラ値を新しいスカラ値にマッピングします。 UDSF の入力データと出力データは、1 対 1 の関係でマッピングされます。 UDSF がデータの行を読み取るたびに、出力値が書き込まれます。

制限

Realtime Compute for Apache Flink によって提供されるサービスは、デプロイ環境とネットワーク環境の影響を受けます。そのため、Realtime Compute for Apache Flink で Python UDF を開発する場合は、次の制限事項に注意してください。

  • Apache Flink 1.12 以降のみがサポートされています。

  • Python は Realtime Compute for Apache Flink ワークスペースにプリインストールされています。そのため、必要なバージョンの Python でコードを開発する必要があります。

    説明

    バージョン 8.0.11 より前の Ververica Runtime(VVR)を使用する Realtime Compute for Apache Flink には、Python 3.7.9 がプリインストールされています。 VVR 8.0.11 以降を使用する Realtime Compute for Apache Flink には、Python 3.9.21 がプリインストールされています。 VVR バージョンを 8.0.11 以降にアップグレードした後、以前の VVR バージョンの PyFlink ドラフトを再度テスト、デプロイ、および実行する必要があります。

  • Realtime Compute for Apache Flink のランタイム環境では、JDK 8 および JDK 11 がサポートされています。 Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが JDK 8 または JDK 11 と互換性があることを確認してください。

  • オープンソースの Scala 2.11 のみがサポートされています。 Python デプロイメントがサードパーティの JAR パッケージに依存している場合は、JAR パッケージが Scala 2.11 と互換性があることを確認してください。

UDSF を作成する

説明

Flink は、UDF を開発するための Python ユーザー定義関数(UDF)のサンプルコードを提供しています。サンプルコードには、Python UDSF、Python ユーザー定義集計関数(UDAF)、および Python ユーザー定義テーブル値関数(UDTF)の実装が含まれています。このセクションでは、Windows オペレーティングシステムで UDSF を作成する方法について説明します。

  1. python_demo-master パッケージをダウンロードして、オンプレミスマシンに解凍します。

    説明

    python_demo-master は、サードパーティの Web サイトで提供されています。 Web サイトにアクセスすると、Web サイトへのアクセスに失敗したり、Web サイトへのアクセスが遅延したりする可能性があります。

  2. PyCharm のメインメニューバーで、[ファイル] > [開く] を選択して、解凍した python_demo-master パッケージを開きます。

  3. \python_demo-master\udx パスにある udfs.py ファイルをダブルクリックします。次に、ビジネス要件に基づいてファイルの内容を変更します。

    この例では、sub_string は、各データレコードの開始位置から終了位置までの文字を取得するコードを定義しています。

    from pyflink.table import DataTypes
    from pyflink.table.udf import udf
    
    
    @udf(result_type=DataTypes.STRING())
    def sub_string(s: str, begin: int, end: int):
        return s[begin:end]
  4. udx フォルダが属する \python_demo-master パスに移動し、次のコマンドを実行してディレクトリ内のファイルをパッケージ化します。

    zip -r python_demo.zip udx

    \python_demo-master\ パスに python_demo.zip パッケージが表示されたら、Python UDSF が作成されます。

UDSF を登録する

UDSF を登録する方法の詳細については、「UDF を管理する」をご参照ください。

UDSF を使用する

UDSF を登録した後、UDSF を使用できます。 UDSF を使用するには、次の手順を実行します。

  1. Flink SQL を使用してドラフトを作成します。詳細については、「SQL ドラフトを開発する」をご参照ください。

    次のコードは、ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字を取得する方法の例を示しています。

    CREATE TEMPORARY TABLE ASI_UDSF_Source (
      a VARCHAR,
      b INT,
      c INT
    ) WITH (
      'connector' = 'datagen'
    );
    
    CREATE TEMPORARY TABLE ASI_UDSF_Sink (
      a VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO ASI_UDSF_Sink
    SELECT ASI_UDSF(a,2,4)
    FROM ASI_UDSF_Source;
  2. Realtime Compute for Apache Flink の開発コンソールの左側のナビゲーションウィンドウで、[O&M] > [デプロイメント] を選択します。 [デプロイメント] ページで、目的のデプロイメントを見つけて、[アクション] 列の [開始] をクリックします。

    デプロイメントが開始されると、ASI_UDSF_Source テーブルの a フィールドの各行の文字列の 2 番目の文字から 4 番目の文字が、ASI_UDSF_Sink テーブルの各行に挿入されます。