すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Python 依存関係の使用

最終更新日:Jan 23, 2026

Flink Python ジョブでは、カスタム Python 仮想環境、サードパーティ Python パッケージ、JAR パッケージ、データファイルを使用できます。このトピックでは、Python ジョブでこれらの依存関係を使用する方法について説明します。

背景情報

このトピックでは、次のシナリオで Python 依存関係を使用する方法について説明します。

プリインストール済みの Python 環境

フルマネージド環境には Python がプリインストールされています。利用可能なバージョンは次のとおりです。

  • Ververica Runtime (VVR) 8.0.10 以前: Python 3.7

  • VVR 8.0.11 以降: Python 3.9

説明

Python 環境にプリインストールされているサードパーティソフトウェアパッケージの詳細については、「Python ジョブ開発」をご参照ください。

一部のサードパーティ Python パッケージには、GNU C Library (glibc) のバージョン要件があります。フルマネージド Flink 環境にプリインストールされている glibc のバージョンは次のとおりです。

X86

  • VVR 8.x 以前: glibc 2.17

  • VVR 11.x 以降: glibc 2.31

ARM

  • VVR 11.2 以前: glibc 2.17

  • VVR 11.3 以降: glibc 2.31

説明

Glibc は上位互換性をサポートしています。したがって、依存する Python サードパーティライブラリで要求される glibc のバージョンは、環境内の glibc バージョンより新しくすることはできません。

カスタム Python 仮想環境の使用

説明

VVR 4.x は Python 3.7 仮想環境のみをサポートします。VVR 6.x 以降にはこの制限はありません。より新しいバージョンの Python 仮想環境を使用できます。

プリインストール済みの Python 環境が要件を満たさない場合は、Python 仮想環境でカスタム Python バージョンを使用できます。各 Python 仮想環境は完全な Python ランタイムを備えており、さまざまな Python 依存関係パッケージをインストールできます。このセクションでは、Python 仮想環境を準備する方法について説明します。

  1. Python 仮想環境を準備します。

    1. ローカルマシンで、setup-pyflink-virtual-env.sh スクリプトを準備します。内容は次のとおりです。

      X86

      set -e
      # miniforge.sh スクリプトをダウンロードします。
      wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-x86_64.sh" -O "miniforge.sh"
      
      # miniforge.sh スクリプトに実行権限を追加します。
      chmod +x miniforge.sh
      
      # miniforge をインストールします。
      ./miniforge.sh -b
      source /root/miniforge3/bin/activate
      
      # Python 仮想環境を作成します。
      mamba create -n venv python=3.10 -y
      eval "$(mamba shell hook --shell bash)"
      
      # Python 仮想環境をアクティベートします。
      mamba activate venv
      
      # PyFlink 依存関係をインストールします。
      # 必要に応じて PyFlink のバージョンを更新します
      pip install "apache-flink==1.20.3"
      
      # パッケージサイズを削減するために不要な JAR パッケージを削除します。
      find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm
      
      # Conda Python 仮想環境をデアクティベートします。
      mamba deactivate
      
      # 準備した Conda Python 仮想環境をパッケージ化します。
      cd /root/miniforge3/envs/ && zip -r /root/venv.zip venv 

      ARM

      set -e
      # miniforge.sh スクリプトをダウンロードします。
      wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-aarch64.sh" -O "miniforge.sh"
      
      # miniforge.sh スクリプトに実行権限を追加します。
      chmod +x miniforge.sh
      
      # miniforge をインストールします。
      ./miniforge.sh -b
      source /root/miniforge3/bin/activate
      
      # Python 仮想環境を作成します。
      mamba create -n venv python=3.10 -y
      eval "$(mamba shell hook --shell bash)"
      
      # Python 仮想環境をアクティベートします。
      mamba activate venv
      
      # PyFlink 依存関係をインストールします。
      # 必要に応じて PyFlink のバージョンを更新します
      yum install -y java-11-openjdk-devel
      export JAVA_HOME=/usr/lib/jvm/java-11
      wget "https://raw.githubusercontent.com/apache/flink/release-1.20/flink-python/dev/dev-requirements.txt" -O dev-requirements.txt
      pip install -r dev-requirements.txt
      pip install "apache-flink==1.20.3"
      
      # パッケージサイズを削減するために不要な JAR パッケージを削除します。
      find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm
      
      # Conda Python 仮想環境をデアクティベートします。
      mamba deactivate
      
      # 準備した Conda Python 仮想環境をパッケージ化します。
      cd /root/miniforge3/envs && zip -r /root/venv.zip venv
    2. 説明

      このトピックでは、VVR 11.x 上のジョブと Python 3.10 を例として使用します。異なる VVR バージョンを使用したり、仮想環境に異なる Python バージョンをインストールしたりするには、次のパラメーターを変更できます。

      • mamba create: Python のバージョンをターゲットバージョンに変更します。

      • apache-flink: Flink のバージョンをジョブの VVR バージョンに対応するものに変更します。Flink のバージョンの確認方法の詳細については、「ワークスペースの管理と操作」をご参照ください。

    3. ローカルマシンで、build.sh スクリプトを準備します。内容は次のとおりです。

      #!/bin/bash
      set -e -x
      
      yum install -y zip wget
      
      cd /root/
      bash /build/setup-pyflink-virtual-env.sh
      mv venv.zip /build/
    4. コマンドラインで、次のコマンドを実行して Python 仮想環境のインストールを完了します。

      X86

      docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.sh

      ARM

      docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.sh

      このコマンドを実行すると、venv.zip という名前のファイルが生成されます。この例では、Python 3.10 用の仮想環境を作成します。

      上記のスクリプトを変更して、必要なサードパーティ Python パッケージを仮想環境にインストールすることもできます。

  2. Python ジョブで Python 仮想環境を使用します。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、venv.zip ファイルをアップロードします。

    4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブの名前をクリックします。

    5. [デプロイメント詳細] タブの [基本構成] セクションで、[Python アーカイブ] に venv.zip ファイルを選択します。

      SQL ジョブが仮想環境で Python ユーザー定義関数 (UDF) を使用する場合、[パラメーター設定] セクションの [その他の構成] フィールドに次の構成を追加します。

      python.archives: oss://.../venv.zip
    6. [パラメーター設定] セクションの [その他の構成] フィールドに、ジョブの VVR バージョンに基づいて Python 仮想環境のインストールパスを指定する構成を追加します。

      • VVR 6.x 以降

        python.executable: venv.zip/venv/bin/python
        python.client.executable: venv.zip/venv/bin/python
      • VVR 6.x より前のバージョン

        python.executable: venv.zip/venv/bin/python

サードパーティ Python パッケージの使用

説明

Zip SafePyPImanylinux はサードパーティの Web サイトです。アクセスに失敗したり、遅延が発生したりする場合があります。

次の 2 つのシナリオでは、サードパーティ Python パッケージの使用方法について説明します。

  • 直接インポート可能なサードパーティ Python パッケージの使用

    サードパーティ Python パッケージが Zip Safe である場合、インストールせずに Python ジョブで直接使用できます。次の手順に従います。

    1. 直接インポート可能なサードパーティ Python パッケージをダウンロードします。

      1. ブラウザで PyPI ページを開きます。

      2. 検索ボックスに、対象のサードパーティ Python パッケージの名前 (例:apache-flink 1.20.3) を入力します。

      3. 検索結果で、対象の結果の名前をクリックします。

      4. 左側のナビゲーションウィンドウで、[ファイルのダウンロード] をクリックします。

      5. cp39-cp39m-manylinux1 を含むパッケージの名前をクリックしてダウンロードします。

    2. Realtime Compute for Apache Flink コンソールにログインします。

    3. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

    4. 左側のナビゲーションウィンドウで [ファイル管理] をクリックして、サードパーティ Python パッケージをアップロードできます。

    5. [オペレーションセンター] > [ジョブ O&M] ページで、[ジョブのデプロイ] > [Python ジョブ] をクリックします。[Python ライブラリ] オプションで、アップロードしたサードパーティ Python パッケージを選択します。

    6. [保存] をクリックします。

  • コンパイルが必要なサードパーティ Python パッケージの使用

    サードパーティ Python パッケージが tar.gz 形式の圧縮パッケージ、または他の場所からダウンロードしたソースコードパッケージで、パッケージのルートディレクトリに setup.py ファイルが存在する場合、通常、使用前にパッケージをコンパイルする必要があります。まず、Flink と互換性のある環境でサードパーティ Python パッケージをコンパイルする必要があります。その後、Python ジョブでパッケージを呼び出すことができます。

    quay.io/pypa/manylinux_2_28_x86_64 イメージコンテナーの Python 3.9 を使用して、サードパーティ Python パッケージをコンパイルすることを推奨します。このコンテナーによって生成されたパッケージは、ほとんどの Linux 環境と互換性があります。このイメージコンテナーの詳細については、「manylinux」をご参照ください。

    説明

    Python 3.9 のインストールパスは /opt/python/cp39-cp39/bin/python3 です。

    次の例は、opencv-python-headless サードパーティ Python パッケージをコンパイルして使用する方法を示しています。

    1. サードパーティ Python パッケージをコンパイルします。

      1. ローカルマシンで、requirements.txt ファイルを準備します。内容は次のとおりです。

        opencv-python-headless
        numpy<2
      2. ローカルマシンで、build.sh スクリプトを準備します。内容は次のとおりです。

        #!/bin/bash
        set -e -x
        
        yum install -y zip
        
        #PYBIN=/opt/python/cp37-cp37m/bin
        #PYBIN=/opt/python/cp38-cp38/bin
        PYBIN=/opt/python/cp39-cp39/bin
        #PYBIN=/opt/python/cp310-cp310/bin
        #PYBIN=/opt/python/cp311-cp311/bin
        
        "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt
        cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd ..
        rm -rf __pypackages__
      3. コマンドラインで、次のコマンドを実行します。

        X86

        docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.sh

        ARM

        docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.sh

        このコマンドを実行すると、deps.zip という名前のファイルが生成されます。このファイルは、コンパイル済みのサードパーティ Python パッケージです。

        requirements.txt を変更して、他の必要なサードパーティ Python パッケージをインストールすることもできます。さらに、requirements.txt ファイルで複数の Python 依存関係を指定できます。

    2. Python ジョブで deps.zip サードパーティ Python パッケージを使用します。

      1. Realtime Compute for Apache Flink コンソールにログインします。

      2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

      3. 左側のナビゲーションウィンドウで [ファイル] をクリックし、deps.zip をアップロードします。

      4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[Python ライブラリ] オプションで deps.zip を選択します。

    1. [保存] をクリックします。

JAR パッケージの使用

Flink Python ジョブがコネクタや Java UDF などの Java クラスを使用する場合、コネクタまたは Java UDF の JAR パッケージを指定する必要があります。次の手順を実行できます。

  1. Realtime Compute for Apache Flink コンソールにログインします。

  2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

  3. 左側のナビゲーションウィンドウで [ファイル] をクリックし、使用する JAR パッケージをアップロードします。

  4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[追加の依存関係] オプションで、使用する JAR パッケージを選択します。

  5. [パラメーター設定] セクションの [その他の構成] フィールドに、構成を追加します。

    たとえば、jar1.jarjar2.jar という名前の複数の JAR パッケージに依存する場合、構成は次のようになります。

    pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'
  6. [保存] をクリックします。

組み込みコネクタ、データ形式、カタログの使用

説明

VVR 11.2 以降で実行されるジョブのみが、組み込みコネクタ、データ形式、カタログをサポートします。

Flink Python ジョブで組み込みコネクタ、データ形式、カタログを使用するには、次のように指定します。

  1. [パラメーター設定] セクションの [その他の構成] フィールドに、構成を追加します。

    たとえば、kafkasls という名前の複数の組み込みコネクタに依存する場合、構成は次のようになります。組み込みコネクタの具体的な名前については、「サポートされているコネクタ」の各コネクタのドキュメントをご参照ください。

    pipeline.used-builtin-connectors: kafka;sls

    たとえば、avroparquet という名前の複数の組み込みデータ形式に依存する場合、構成は次のようになります。組み込みデータ形式の具体的な名前については、「データ形式」のドキュメントをご参照ください。

    pipeline.used-builtin-formats: avro;parquet

    たとえば、hive-2.3.6paimon という名前の複数の組み込みカタログに依存する場合、構成は次のようになります。組み込みカタログの具体的な名前については、「Data Management」の対応するカタログのドキュメントをご参照ください。

    pipeline.used-builtin-catalogs: hive-2.3.6;paimon
  2. [保存] をクリックします。

データファイルの使用

説明

Flink は、データファイルをアップロードして Python ジョブをデバッグすることをサポートしていません。

次の 2 つのシナリオでは、データファイルの使用方法について説明します。

  • Python アーカイブオプションの使用

    多数のデータファイルがある場合は、それらを ZIP ファイルにパッケージ化し、次の手順で Python ジョブで使用できます。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで [ファイル管理] をクリックし、対象のデータファイルの ZIP パッケージをアップロードします。

    4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[Python アーカイブ] オプションで、使用するデータファイルの ZIP パッケージを選択します。

    5. Python UDF では、次のようにデータファイルにアクセスできます。この例では、データファイルを含む圧縮パッケージの名前が mydata.zip であると仮定します。

      def map():
          with open("mydata.zip/mydata/data.txt") as f:
          ...
  • 追加の依存関係オプションの使用

    データファイルの数が少ない場合は、次の手順で Python ジョブで使用できます。

    1. Realtime Compute for Apache Flink コンソールにログインします。

    2. 対象のワークスペースの [操作] 列で、[コンソール] をクリックします。

    3. 左側のナビゲーションウィンドウで [ファイル] をクリックし、対象のデータファイルをアップロードします。

    4. [オペレーションセンター] > [ジョブ O&M] ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[追加の依存関係] オプションで、使用するデータファイルを選択します。

    5. Python UDF では、次のようにデータファイルにアクセスできます。次のコードでは、data.txt という名前のデータファイルを例として使用します。

      def map():
          with open("/flink/usrlib/data.txt") as f:
          ...

関連ドキュメント