すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Python 依存関係の使用

最終更新日:Nov 26, 2025

Flink Python ジョブでは、カスタム Python 仮想環境、サードパーティの Python パッケージ、JAR パッケージ、およびデータファイルを使用できます。このトピックでは、これらの依存関係の使用方法について説明します。

背景情報

このトピックでは、次のシナリオでの Python 依存関係の使用方法について説明します。

カスタム Python 仮想環境の使用

説明

Ververica Runtime (VVR) 4.x は Python 3.7 仮想環境のみをサポートします。VVR 6.x 以降のバージョンにはこの制限はなく、より新しい Python バージョンをサポートします。

Python では、仮想環境を作成できます。各 Python 仮想環境は、依存関係パッケージをインストールできる完全な Python ランタイム環境を提供します。 次のセクションでは、Python 仮想環境を準備する方法について説明します。

  1. Python 仮想環境を準備します。

    1. ローカルマシンで、setup-pyflink-virtual-env.sh という名前のスクリプトを次の内容で作成します。

    2. set -e
      # Python 3.10 の miniconda.sh スクリプトをダウンロードします。
      wget "https://repo.continuum.io/miniconda/Miniconda3-py310_24.7.1-0-Linux-x86_64.sh" -O "miniconda.sh"
      
      # Python 3.10 の miniconda.sh スクリプトに実行権限を追加します。
      chmod +x miniconda.sh
      
      # Python 仮想環境を作成します。
      ./miniconda.sh -b -p venv
      
      # Conda Python 仮想環境をアクティベートします。
      source venv/bin/activate ""
      
      # PyFlink 依存関係をインストールします。
      # 必要に応じて PyFlink のバージョンを更新します
      pip install "apache-flink==1.17.0"
      
      # Conda Python 仮想環境を非アクティブ化します。
      conda deactivate
      
      # キャッシュされたパッケージを削除します。
      rm -rf venv/pkgs
      
      # 準備した Conda Python 仮想環境をパッケージ化します。
      zip -r venv.zip venv
      説明

      このトピックでは、VVR 8.x と Python 3.10 を使用するジョブを例として使用します。異なる VVR バージョンを使用する場合、または異なる Python バージョン用の Python 仮想環境を作成する場合は、次の 2 つのパラメーターを変更してください:

      • `miniconda.sh` スクリプトの URL:対象バージョンの URL に変更します。

      • `apache-flink`:ジョブの VVR バージョンに対応する Flink バージョンに変更します。Flink バージョンの表示方法の詳細については、「ワークスペースの管理と操作」をご参照ください。

    3. ローカルマシンで、build.sh という名前のスクリプトを次の内容で作成します。

      #!/bin/bash
      set -e -x
      
      sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
      sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
      
      yum install -y zip wget
      
      cd /root/
      bash /build/setup-pyflink-virtual-env.sh
      mv venv.zip /build/
    4. コマンドラインで次のコマンドを実行して、Python 仮想環境パッケージをビルドします。

      docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh

      コマンドを実行すると、venv.zip という名前のファイルが生成されます。このファイルには、Python 3.10 仮想環境が含まれています。

      また、スクリプトを変更して、必要なサードパーティの Python パッケージを仮想環境にインストールすることもできます。

  2. Python ジョブで Python 仮想環境を使用します。

    1. リアルタイムコンピューティングコンソールにログインします。

    2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、venv.zip ファイルをアップロードします。

    4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブ名をクリックします。

    5. [デプロイメント詳細] タブの [基本構成] セクションで、[Python アーカイブ] に venv.zip を選択します。

      SQL ジョブの仮想環境で Python ユーザー定義関数 (UDF) を使用するには、[パラメーター設定] セクションの [その他の構成] フィールドに次の構成を追加します。

      python.archives: oss://.../venv.zip
    6. [パラメーター設定] セクションで、Python 仮想環境のインストールパスの構成を [その他の構成] フィールドに追加します。構成は、ジョブの VVR バージョンによって異なります。

      • VVR 6.x 以降のバージョン

        python.executable: venv.zip/venv/bin/python
        python.client.executable: venv.zip/venv/bin/python
      • VVR 6.x より前のバージョン

        python.executable: venv.zip/venv/bin/python

サードパーティ製 Python パッケージの使用

説明

次の Zip SafePyPI、および manylinux へのリンクはサードパーティの Web サイトです。これらの Web サイトはアクセスできないか、読み込みが遅い場合があります。

以降では、2 つのシナリオでサードパーティの Python パッケージを使用する方法について説明します。

  • 直接インポートできるサードパーティ製 Python パッケージを使用する

    サードパーティの Python パッケージが Zip Safe である場合、インストールせずに Python ジョブで直接使用できます。次の手順を実行します:

    1. 直接インポートできるサードパーティ製 Python パッケージをダウンロードします。

      1. ブラウザで PyPI ページに移動します。

      2. 検索ボックスに、apache-flink 1.12.2 などの対象のサードパーティ Python パッケージの名前を入力します。

      3. 検索結果で、パッケージの名前をクリックします。

      4. 左側のナビゲーションウィンドウで、[ファイルのダウンロード] をクリックします。

      5. cp37-cp37m-manylinux1 を含むパッケージの名前をクリックして、パッケージをダウンロードします。

    2. リアルタイムコンピューティングコンソールにログインします。

    3. 対象のワークスペースについて、[操作] 列の [コンソール] をクリックします。

    4. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、サードパーティの Python パッケージをアップロードします。

    5. [オペレーションセンター] > [ジョブ O&M] ページで、[ジョブのデプロイ] > [Python ジョブ] をクリックします。Python ライブラリ で、アップロードしたサードパーティの Python パッケージを選択します。

    6. [保存] をクリックします。

  • コンパイルが必要なサードパーティ製 Python パッケージを使用する

    サードパーティの Python パッケージが tar.gz 形式の圧縮パッケージであるか、ルートディレクトリに setup.py ファイルを含むソースパッケージである場合、通常、使用する前にパッケージをコンパイルする必要があります。Python ジョブで呼び出す前に、Flink と互換性のある環境でパッケージをコンパイルする必要があります。

    サードパーティの Python パッケージをコンパイルするには、Python 3.7quay.io/pypa/manylinux2014_x86_64 イメージコンテナーで使用することを推奨します。このコンテナーを使用してコンパイルされたパッケージは、ほとんどの Linux 環境と互換性があります。このイメージコンテナーの詳細については、「manylinux」をご参照ください。

    説明

    Python 3.7 のインストールパスは /opt/python/cp37-cp37m/bin/python3 です。

    次の例では、opencv-python-headless サードパーティ Python パッケージをコンパイルして使用する方法を示します。

    1. サードパーティの Python パッケージをコンパイルします。

      1. ローカルマシンで、requirements.txt という名前のファイルを次の内容で作成します。

        opencv-python-headless
        numpy<2
      2. ローカルマシンで、build.sh という名前のスクリプトを次の内容で作成します。

        #!/bin/bash
        set -e -x
        
        sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
        sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
        
        yum install -y zip
        
        PYBIN=/opt/python/cp37-cp37m/bin
        #PYBIN=/opt/python/cp38-cp38/bin
        #PYBIN=/opt/python/cp39-cp39/bin
        #PYBIN=/opt/python/cp310-cp310/bin
        #PYBIN=/opt/python/cp311-cp311/bin
        
        "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt
        cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd ..
        rm -rf __pypackages__
      3. コマンドラインインターフェイス (CLI) で、次のコマンドを実行します。

        docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh
        
        # 最新の pypa/manylinux2014_x86_64 イメージでは Python 3.7 のサポートが削除されているため、Python 3.7 をサポートするイメージタグを使用してください:
        docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64:2025.05.03-1 /bin/bash build.sh

        コマンドを実行すると、deps.zip という名前のファイルが生成されます。このファイルは、コンパイルされたサードパーティ製 Python パッケージです。

        また、requirements.txt ファイルを変更して、他のサードパーティ Python パッケージをインストールすることもできます。requirements.txt ファイルには、複数の Python 依存関係を指定できます。

    2. deps.zip サードパーティ Python パッケージを Python ジョブで使用します。

      1. リアルタイムコンピューティングコンソールにログインします。

      2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

      3. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、deps.zip をアップロードします。

      4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。[Python ライブラリ] オプションを deps.zip に設定します。

    1. [保存] をクリックします。

JAR パッケージの使用

Flink Python ジョブがコネクタや Java ユーザー定義関数 (UDF) などの Java クラスを使用する場合、次のようにコネクタまたは Java UDF の JAR パッケージを指定できます。

  1. リアルタイムコンピューティングコンソールにログインします。

  2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、JAR パッケージをアップロードします。

  4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。追加の依存関係 で、使用する JAR パッケージを選択します。

  5. [パラメーター設定] セクションで、[その他の構成] フィールドに構成を指定します。

    たとえば、jar1.jarjar2.jar という名前の複数の JAR パッケージを使用するには、次の構成を追加します。

    pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'
  6. [保存] をクリックします。

組み込みコネクタ、データ形式、およびカタログの使用

説明

組み込みのコネクタ、データ形式、およびカタログは、リアルタイムコンピューティングエンジン VVR 11.2 以降のバージョンでのみサポートされます。

Flink Python ジョブで組み込みのコネクタ、データ形式、およびカタログを使用するには、次のように指定します。

  1. [パラメーター設定] セクションで、[その他の構成] フィールドに構成を追加します。

    たとえば、kafkasls などの複数の組み込みコネクタを使用するには、次の構成を追加します。組み込みコネクタの具体的な名前については、「サポートされているコネクタ」のドキュメントをご参照ください。

    pipeline.used-builtin-connectors: kafka;sls

    たとえば、avroparquet などの複数の組み込みデータ形式を使用するには、次の構成を追加します。組み込みデータ形式の具体的な名前については、「データ形式」のドキュメントをご参照ください。

    pipeline.used-builtin-formats: avro;parquet

    たとえば、hive-2.3.6paimon などの複数の組み込みカタログを使用するには、次の構成を追加します。組み込みカタログの具体的な名前については、「Data Management」の対応するカタログのドキュメントをご参照ください。

    pipeline.used-builtin-catalogs: hive-2.3.6;paimon
  2. [保存] をクリックします。

データファイルの使用

説明

Flink は、データファイルをアップロードして Python ジョブをデバッグすることをサポートしていません。

以降では、2 つのシナリオでデータファイルを使用する方法について説明します。

  • Python アーカイブオプションの使用

    多数のデータファイルがある場合は、それらを ZIP ファイルに圧縮し、次のように Python ジョブで使用できます:

    1. リアルタイムコンピューティングコンソールにログインします。

    2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、データファイルの ZIP パッケージをアップロードします。

    4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。Python アーカイブ オプションで、データファイルの ZIP パッケージを選択します。

    5. Python ユーザー定義関数 (UDF) では、次のようにデータファイルにアクセスできます。次の例では、データファイルを含む圧縮パッケージの名前が mydata.zip であると仮定します。

      def map():
          with open("mydata.zip/mydata/data.txt") as f:
          ...
  • 追加の依存関係オプションの使用

    データファイルの数が少ない場合は、次のように Python ジョブで使用できます:

    1. リアルタイムコンピューティングコンソールにログインします。

    2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、データファイルをアップロードします。

    4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。追加の依存関係 フィールドで、目的のデータファイルを選択します。

    5. Python ユーザー定義関数 (UDF) では、次のようにデータファイルにアクセスできます。次の例では、データファイルの名前が data.txt であると仮定します。

      def map():
          with open("/flink/usrlib/data.txt") as f:
          ...

リファレンス