Flink Python ジョブでは、カスタム Python 仮想環境、サードパーティ Python パッケージ、JAR パッケージ、データファイルを使用できます。このトピックでは、Python ジョブでこれらの依存関係を使用する方法について説明します。
背景情報
このトピックでは、次のシナリオで Python 依存関係を使用する方法について説明します。
プリインストール済みの Python 環境
フルマネージド環境には Python がプリインストールされています。利用可能なバージョンは次のとおりです。
Ververica Runtime (VVR) 8.0.10 以前: Python 3.7
VVR 8.0.11 以降: Python 3.9
Python 環境にプリインストールされているサードパーティソフトウェアパッケージの詳細については、「Python ジョブ開発」をご参照ください。
一部のサードパーティ Python パッケージには、GNU C Library (glibc) のバージョン要件があります。フルマネージド Flink 環境にプリインストールされている glibc のバージョンは次のとおりです。
X86
VVR 8.x 以前: glibc 2.17
VVR 11.x 以降: glibc 2.31
ARM
VVR 11.2 以前: glibc 2.17
VVR 11.3 以降: glibc 2.31
Glibc は上位互換性をサポートしています。したがって、依存する Python サードパーティライブラリで要求される glibc のバージョンは、環境内の glibc バージョンより新しくすることはできません。
カスタム Python 仮想環境の使用
VVR 4.x は Python 3.7 仮想環境のみをサポートします。VVR 6.x 以降にはこの制限はありません。より新しいバージョンの Python 仮想環境を使用できます。
プリインストール済みの Python 環境が要件を満たさない場合は、Python 仮想環境でカスタム Python バージョンを使用できます。各 Python 仮想環境は完全な Python ランタイムを備えており、さまざまな Python 依存関係パッケージをインストールできます。このセクションでは、Python 仮想環境を準備する方法について説明します。
Python 仮想環境を準備します。
ローカルマシンで、setup-pyflink-virtual-env.sh スクリプトを準備します。内容は次のとおりです。
X86
set -e # miniforge.sh スクリプトをダウンロードします。 wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-x86_64.sh" -O "miniforge.sh" # miniforge.sh スクリプトに実行権限を追加します。 chmod +x miniforge.sh # miniforge をインストールします。 ./miniforge.sh -b source /root/miniforge3/bin/activate # Python 仮想環境を作成します。 mamba create -n venv python=3.10 -y eval "$(mamba shell hook --shell bash)" # Python 仮想環境をアクティベートします。 mamba activate venv # PyFlink 依存関係をインストールします。 # 必要に応じて PyFlink のバージョンを更新します pip install "apache-flink==1.20.3" # パッケージサイズを削減するために不要な JAR パッケージを削除します。 find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm # Conda Python 仮想環境をデアクティベートします。 mamba deactivate # 準備した Conda Python 仮想環境をパッケージ化します。 cd /root/miniforge3/envs/ && zip -r /root/venv.zip venvARM
set -e # miniforge.sh スクリプトをダウンロードします。 wget "https://github.com/conda-forge/miniforge/releases/download/25.11.0-1/Miniforge3-25.11.0-1-Linux-aarch64.sh" -O "miniforge.sh" # miniforge.sh スクリプトに実行権限を追加します。 chmod +x miniforge.sh # miniforge をインストールします。 ./miniforge.sh -b source /root/miniforge3/bin/activate # Python 仮想環境を作成します。 mamba create -n venv python=3.10 -y eval "$(mamba shell hook --shell bash)" # Python 仮想環境をアクティベートします。 mamba activate venv # PyFlink 依存関係をインストールします。 # 必要に応じて PyFlink のバージョンを更新します yum install -y java-11-openjdk-devel export JAVA_HOME=/usr/lib/jvm/java-11 wget "https://raw.githubusercontent.com/apache/flink/release-1.20/flink-python/dev/dev-requirements.txt" -O dev-requirements.txt pip install -r dev-requirements.txt pip install "apache-flink==1.20.3" # パッケージサイズを削減するために不要な JAR パッケージを削除します。 find /root/miniforge3/envs/venv/lib/python3.10/site-packages/pyflink/ -name *.jar | xargs rm # Conda Python 仮想環境をデアクティベートします。 mamba deactivate # 準備した Conda Python 仮想環境をパッケージ化します。 cd /root/miniforge3/envs && zip -r /root/venv.zip venvmamba create: Python のバージョンをターゲットバージョンに変更します。
apache-flink: Flink のバージョンをジョブの VVR バージョンに対応するものに変更します。Flink のバージョンの確認方法の詳細については、「ワークスペースの管理と操作」をご参照ください。
ローカルマシンで、build.sh スクリプトを準備します。内容は次のとおりです。
#!/bin/bash set -e -x yum install -y zip wget cd /root/ bash /build/setup-pyflink-virtual-env.sh mv venv.zip /build/コマンドラインで、次のコマンドを実行して Python 仮想環境のインストールを完了します。
X86
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.shARM
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.shこのコマンドを実行すると、venv.zip という名前のファイルが生成されます。この例では、Python 3.10 用の仮想環境を作成します。
上記のスクリプトを変更して、必要なサードパーティ Python パッケージを仮想環境にインストールすることもできます。
説明このトピックでは、VVR 11.x 上のジョブと Python 3.10 を例として使用します。異なる VVR バージョンを使用したり、仮想環境に異なる Python バージョンをインストールしたりするには、次のパラメーターを変更できます。
Python ジョブで Python 仮想環境を使用します。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、venv.zip ファイルをアップロードします。
ページで、対象のジョブの名前をクリックします。
[デプロイメント詳細] タブの [基本構成] セクションで、[Python アーカイブ] に venv.zip ファイルを選択します。
SQL ジョブが仮想環境で Python ユーザー定義関数 (UDF) を使用する場合、[パラメーター設定] セクションの [その他の構成] フィールドに次の構成を追加します。
python.archives: oss://.../venv.zip[パラメーター設定] セクションの [その他の構成] フィールドに、ジョブの VVR バージョンに基づいて Python 仮想環境のインストールパスを指定する構成を追加します。
VVR 6.x 以降
python.executable: venv.zip/venv/bin/python python.client.executable: venv.zip/venv/bin/pythonVVR 6.x より前のバージョン
python.executable: venv.zip/venv/bin/python
サードパーティ Python パッケージの使用
次の 2 つのシナリオでは、サードパーティ Python パッケージの使用方法について説明します。
直接インポート可能なサードパーティ Python パッケージの使用
サードパーティ Python パッケージが Zip Safe である場合、インストールせずに Python ジョブで直接使用できます。次の手順に従います。
直接インポート可能なサードパーティ Python パッケージをダウンロードします。
ブラウザで PyPI ページを開きます。
検索ボックスに、対象のサードパーティ Python パッケージの名前 (例:apache-flink 1.20.3) を入力します。
検索結果で、対象の結果の名前をクリックします。
左側のナビゲーションウィンドウで、[ファイルのダウンロード] をクリックします。
cp39-cp39m-manylinux1 を含むパッケージの名前をクリックしてダウンロードします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックして、サードパーティ Python パッケージをアップロードできます。
ページで、 をクリックします。[Python ライブラリ] オプションで、アップロードしたサードパーティ Python パッケージを選択します。
[保存] をクリックします。
コンパイルが必要なサードパーティ Python パッケージの使用
サードパーティ Python パッケージが tar.gz 形式の圧縮パッケージ、または他の場所からダウンロードしたソースコードパッケージで、パッケージのルートディレクトリに setup.py ファイルが存在する場合、通常、使用前にパッケージをコンパイルする必要があります。まず、Flink と互換性のある環境でサードパーティ Python パッケージをコンパイルする必要があります。その後、Python ジョブでパッケージを呼び出すことができます。
quay.io/pypa/manylinux_2_28_x86_64 イメージコンテナーの Python 3.9 を使用して、サードパーティ Python パッケージをコンパイルすることを推奨します。このコンテナーによって生成されたパッケージは、ほとんどの Linux 環境と互換性があります。このイメージコンテナーの詳細については、「manylinux」をご参照ください。
説明Python 3.9 のインストールパスは /opt/python/cp39-cp39/bin/python3 です。
次の例は、opencv-python-headless サードパーティ Python パッケージをコンパイルして使用する方法を示しています。
サードパーティ Python パッケージをコンパイルします。
ローカルマシンで、requirements.txt ファイルを準備します。内容は次のとおりです。
opencv-python-headless numpy<2ローカルマシンで、build.sh スクリプトを準備します。内容は次のとおりです。
#!/bin/bash set -e -x yum install -y zip #PYBIN=/opt/python/cp37-cp37m/bin #PYBIN=/opt/python/cp38-cp38/bin PYBIN=/opt/python/cp39-cp39/bin #PYBIN=/opt/python/cp310-cp310/bin #PYBIN=/opt/python/cp311-cp311/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__コマンドラインで、次のコマンドを実行します。
X86
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_x86_64 bash ./build.shARM
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux_2_28_aarch64 bash ./build.shこのコマンドを実行すると、deps.zip という名前のファイルが生成されます。このファイルは、コンパイル済みのサードパーティ Python パッケージです。
requirements.txt を変更して、他の必要なサードパーティ Python パッケージをインストールすることもできます。さらに、requirements.txt ファイルで複数の Python 依存関係を指定できます。
Python ジョブで deps.zip サードパーティ Python パッケージを使用します。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル] をクリックし、deps.zip をアップロードします。
ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[Python ライブラリ] オプションで deps.zip を選択します。
[保存] をクリックします。
JAR パッケージの使用
Flink Python ジョブがコネクタや Java UDF などの Java クラスを使用する場合、コネクタまたは Java UDF の JAR パッケージを指定する必要があります。次の手順を実行できます。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル] をクリックし、使用する JAR パッケージをアップロードします。
ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[追加の依存関係] オプションで、使用する JAR パッケージを選択します。
[パラメーター設定] セクションの [その他の構成] フィールドに、構成を追加します。
たとえば、jar1.jar と jar2.jar という名前の複数の JAR パッケージに依存する場合、構成は次のようになります。
pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'[保存] をクリックします。
組み込みコネクタ、データ形式、カタログの使用
VVR 11.2 以降で実行されるジョブのみが、組み込みコネクタ、データ形式、カタログをサポートします。
Flink Python ジョブで組み込みコネクタ、データ形式、カタログを使用するには、次のように指定します。
[パラメーター設定] セクションの [その他の構成] フィールドに、構成を追加します。
たとえば、kafka と sls という名前の複数の組み込みコネクタに依存する場合、構成は次のようになります。組み込みコネクタの具体的な名前については、「サポートされているコネクタ」の各コネクタのドキュメントをご参照ください。
pipeline.used-builtin-connectors: kafka;slsたとえば、avro と parquet という名前の複数の組み込みデータ形式に依存する場合、構成は次のようになります。組み込みデータ形式の具体的な名前については、「データ形式」のドキュメントをご参照ください。
pipeline.used-builtin-formats: avro;parquetたとえば、hive-2.3.6 と paimon という名前の複数の組み込みカタログに依存する場合、構成は次のようになります。組み込みカタログの具体的な名前については、「Data Management」の対応するカタログのドキュメントをご参照ください。
pipeline.used-builtin-catalogs: hive-2.3.6;paimon[保存] をクリックします。
データファイルの使用
Flink は、データファイルをアップロードして Python ジョブをデバッグすることをサポートしていません。
次の 2 つのシナリオでは、データファイルの使用方法について説明します。
Python アーカイブオプションの使用
多数のデータファイルがある場合は、それらを ZIP ファイルにパッケージ化し、次の手順で Python ジョブで使用できます。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、対象のデータファイルの ZIP パッケージをアップロードします。
ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[Python アーカイブ] オプションで、使用するデータファイルの ZIP パッケージを選択します。
Python UDF では、次のようにデータファイルにアクセスできます。この例では、データファイルを含む圧縮パッケージの名前が mydata.zip であると仮定します。
def map(): with open("mydata.zip/mydata/data.txt") as f: ...
追加の依存関係オプションの使用
データファイルの数が少ない場合は、次の手順で Python ジョブで使用できます。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル] をクリックし、対象のデータファイルをアップロードします。
ページで、対象のジョブの名前をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。次に、[追加の依存関係] オプションで、使用するデータファイルを選択します。
Python UDF では、次のようにデータファイルにアクセスできます。次のコードでは、data.txt という名前のデータファイルを例として使用します。
def map(): with open("/flink/usrlib/data.txt") as f: ...
関連ドキュメント
Python API ジョブの開発方法の詳細については、「Python ジョブ開発」をご参照ください。
Flink Python ジョブの開発プロセスの完全な例については、「Flink Python ジョブのクイックスタート」をご参照ください。
フルマネージド Flink は、SQL および DataStream ジョブもサポートしています。これらのジョブの開発方法の詳細については、「ジョブ開発マップ」および「JAR ジョブ開発」をご参照ください。