Flink Python ジョブでは、カスタム Python 仮想環境、サードパーティの Python パッケージ、JAR パッケージ、およびデータファイルを使用できます。このトピックでは、これらの依存関係の使用方法について説明します。
背景情報
このトピックでは、次のシナリオでの Python 依存関係の使用方法について説明します。
カスタム Python 仮想環境の使用
Ververica Runtime (VVR) 4.x は Python 3.7 仮想環境のみをサポートします。VVR 6.x 以降のバージョンにはこの制限はなく、より新しい Python バージョンをサポートします。
Python では、仮想環境を作成できます。各 Python 仮想環境は、依存関係パッケージをインストールできる完全な Python ランタイム環境を提供します。 次のセクションでは、Python 仮想環境を準備する方法について説明します。
Python 仮想環境を準備します。
ローカルマシンで、setup-pyflink-virtual-env.sh という名前のスクリプトを次の内容で作成します。
`miniconda.sh` スクリプトの URL:対象バージョンの URL に変更します。
`apache-flink`:ジョブの VVR バージョンに対応する Flink バージョンに変更します。Flink バージョンの表示方法の詳細については、「ワークスペースの管理と操作」をご参照ください。
ローカルマシンで、build.sh という名前のスクリプトを次の内容で作成します。
#!/bin/bash set -e -x sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* yum install -y zip wget cd /root/ bash /build/setup-pyflink-virtual-env.sh mv venv.zip /build/コマンドラインで次のコマンドを実行して、Python 仮想環境パッケージをビルドします。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 ./build.shコマンドを実行すると、venv.zip という名前のファイルが生成されます。このファイルには、Python 3.10 仮想環境が含まれています。
また、スクリプトを変更して、必要なサードパーティの Python パッケージを仮想環境にインストールすることもできます。
set -e # Python 3.10 の miniconda.sh スクリプトをダウンロードします。 wget "https://repo.continuum.io/miniconda/Miniconda3-py310_24.7.1-0-Linux-x86_64.sh" -O "miniconda.sh" # Python 3.10 の miniconda.sh スクリプトに実行権限を追加します。 chmod +x miniconda.sh # Python 仮想環境を作成します。 ./miniconda.sh -b -p venv # Conda Python 仮想環境をアクティベートします。 source venv/bin/activate "" # PyFlink 依存関係をインストールします。 # 必要に応じて PyFlink のバージョンを更新します pip install "apache-flink==1.17.0" # Conda Python 仮想環境を非アクティブ化します。 conda deactivate # キャッシュされたパッケージを削除します。 rm -rf venv/pkgs # 準備した Conda Python 仮想環境をパッケージ化します。 zip -r venv.zip venv説明このトピックでは、VVR 8.x と Python 3.10 を使用するジョブを例として使用します。異なる VVR バージョンを使用する場合、または異なる Python バージョン用の Python 仮想環境を作成する場合は、次の 2 つのパラメーターを変更してください:
Python ジョブで Python 仮想環境を使用します。
リアルタイムコンピューティングコンソールにログインします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、venv.zip ファイルをアップロードします。
ページで、対象のジョブ名をクリックします。
[デプロイメント詳細] タブの [基本構成] セクションで、[Python アーカイブ] に venv.zip を選択します。
SQL ジョブの仮想環境で Python ユーザー定義関数 (UDF) を使用するには、[パラメーター設定] セクションの [その他の構成] フィールドに次の構成を追加します。
python.archives: oss://.../venv.zip[パラメーター設定] セクションで、Python 仮想環境のインストールパスの構成を [その他の構成] フィールドに追加します。構成は、ジョブの VVR バージョンによって異なります。
VVR 6.x 以降のバージョン
python.executable: venv.zip/venv/bin/python python.client.executable: venv.zip/venv/bin/pythonVVR 6.x より前のバージョン
python.executable: venv.zip/venv/bin/python
サードパーティ製 Python パッケージの使用
以降では、2 つのシナリオでサードパーティの Python パッケージを使用する方法について説明します。
直接インポートできるサードパーティ製 Python パッケージを使用する
サードパーティの Python パッケージが Zip Safe である場合、インストールせずに Python ジョブで直接使用できます。次の手順を実行します:
直接インポートできるサードパーティ製 Python パッケージをダウンロードします。
ブラウザで PyPI ページに移動します。
検索ボックスに、apache-flink 1.12.2 などの対象のサードパーティ Python パッケージの名前を入力します。
検索結果で、パッケージの名前をクリックします。
左側のナビゲーションウィンドウで、[ファイルのダウンロード] をクリックします。
cp37-cp37m-manylinux1 を含むパッケージの名前をクリックして、パッケージをダウンロードします。
リアルタイムコンピューティングコンソールにログインします。
対象のワークスペースについて、[操作] 列の [コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、サードパーティの Python パッケージをアップロードします。
ページで、 をクリックします。Python ライブラリ で、アップロードしたサードパーティの Python パッケージを選択します。
[保存] をクリックします。
コンパイルが必要なサードパーティ製 Python パッケージを使用する
サードパーティの Python パッケージが tar.gz 形式の圧縮パッケージであるか、ルートディレクトリに setup.py ファイルを含むソースパッケージである場合、通常、使用する前にパッケージをコンパイルする必要があります。Python ジョブで呼び出す前に、Flink と互換性のある環境でパッケージをコンパイルする必要があります。
サードパーティの Python パッケージをコンパイルするには、Python 3.7 を quay.io/pypa/manylinux2014_x86_64 イメージコンテナーで使用することを推奨します。このコンテナーを使用してコンパイルされたパッケージは、ほとんどの Linux 環境と互換性があります。このイメージコンテナーの詳細については、「manylinux」をご参照ください。
説明Python 3.7 のインストールパスは /opt/python/cp37-cp37m/bin/python3 です。
次の例では、opencv-python-headless サードパーティ Python パッケージをコンパイルして使用する方法を示します。
サードパーティの Python パッケージをコンパイルします。
ローカルマシンで、requirements.txt という名前のファイルを次の内容で作成します。
opencv-python-headless numpy<2ローカルマシンで、build.sh という名前のスクリプトを次の内容で作成します。
#!/bin/bash set -e -x sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-* sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-* yum install -y zip PYBIN=/opt/python/cp37-cp37m/bin #PYBIN=/opt/python/cp38-cp38/bin #PYBIN=/opt/python/cp39-cp39/bin #PYBIN=/opt/python/cp310-cp310/bin #PYBIN=/opt/python/cp311-cp311/bin "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd .. rm -rf __pypackages__コマンドラインインターフェイス (CLI) で、次のコマンドを実行します。
docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh # 最新の pypa/manylinux2014_x86_64 イメージでは Python 3.7 のサポートが削除されているため、Python 3.7 をサポートするイメージタグを使用してください: docker run -it --rm -v $PWD:/build -w /build quay.io/pypa/manylinux2014_x86_64:2025.05.03-1 /bin/bash build.shコマンドを実行すると、deps.zip という名前のファイルが生成されます。このファイルは、コンパイルされたサードパーティ製 Python パッケージです。
また、requirements.txt ファイルを変更して、他のサードパーティ Python パッケージをインストールすることもできます。requirements.txt ファイルには、複数の Python 依存関係を指定できます。
deps.zip サードパーティ Python パッケージを Python ジョブで使用します。
リアルタイムコンピューティングコンソールにログインします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、deps.zip をアップロードします。
ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。[Python ライブラリ] オプションを deps.zip に設定します。
[保存] をクリックします。
JAR パッケージの使用
Flink Python ジョブがコネクタや Java ユーザー定義関数 (UDF) などの Java クラスを使用する場合、次のようにコネクタまたは Java UDF の JAR パッケージを指定できます。
リアルタイムコンピューティングコンソールにログインします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、JAR パッケージをアップロードします。
ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。追加の依存関係 で、使用する JAR パッケージを選択します。
[パラメーター設定] セクションで、[その他の構成] フィールドに構成を指定します。
たとえば、jar1.jar と jar2.jar という名前の複数の JAR パッケージを使用するには、次の構成を追加します。
pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'[保存] をクリックします。
組み込みコネクタ、データ形式、およびカタログの使用
組み込みのコネクタ、データ形式、およびカタログは、リアルタイムコンピューティングエンジン VVR 11.2 以降のバージョンでのみサポートされます。
Flink Python ジョブで組み込みのコネクタ、データ形式、およびカタログを使用するには、次のように指定します。
[パラメーター設定] セクションで、[その他の構成] フィールドに構成を追加します。
たとえば、kafka や sls などの複数の組み込みコネクタを使用するには、次の構成を追加します。組み込みコネクタの具体的な名前については、「サポートされているコネクタ」のドキュメントをご参照ください。
pipeline.used-builtin-connectors: kafka;slsたとえば、avro や parquet などの複数の組み込みデータ形式を使用するには、次の構成を追加します。組み込みデータ形式の具体的な名前については、「データ形式」のドキュメントをご参照ください。
pipeline.used-builtin-formats: avro;parquetたとえば、hive-2.3.6 や paimon などの複数の組み込みカタログを使用するには、次の構成を追加します。組み込みカタログの具体的な名前については、「Data Management」の対応するカタログのドキュメントをご参照ください。
pipeline.used-builtin-catalogs: hive-2.3.6;paimon[保存] をクリックします。
データファイルの使用
Flink は、データファイルをアップロードして Python ジョブをデバッグすることをサポートしていません。
以降では、2 つのシナリオでデータファイルを使用する方法について説明します。
Python アーカイブオプションの使用
多数のデータファイルがある場合は、それらを ZIP ファイルに圧縮し、次のように Python ジョブで使用できます:
リアルタイムコンピューティングコンソールにログインします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、データファイルの ZIP パッケージをアップロードします。
ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。Python アーカイブ オプションで、データファイルの ZIP パッケージを選択します。
Python ユーザー定義関数 (UDF) では、次のようにデータファイルにアクセスできます。次の例では、データファイルを含む圧縮パッケージの名前が mydata.zip であると仮定します。
def map(): with open("mydata.zip/mydata/data.txt") as f: ...
追加の依存関係オプションの使用
データファイルの数が少ない場合は、次のように Python ジョブで使用できます:
リアルタイムコンピューティングコンソールにログインします。
対象のワークスペースの [操作] 列で、[コンソール] をクリックします。
左側のナビゲーションウィンドウで [ファイル管理] をクリックし、データファイルをアップロードします。
ページで、対象のジョブ名をクリックします。[デプロイメント詳細] タブの [基本構成] セクションで、[編集] をクリックします。追加の依存関係 フィールドで、目的のデータファイルを選択します。
Python ユーザー定義関数 (UDF) では、次のようにデータファイルにアクセスできます。次の例では、データファイルの名前が data.txt であると仮定します。
def map(): with open("/flink/usrlib/data.txt") as f: ...
リファレンス
Python API ジョブの開発方法の詳細については、「Python ジョブ開発」をご参照ください。
Flink Python ジョブの開発プロセスの完全な例については、「Flink Python ジョブのクイックスタート」をご参照ください。
フルマネージド Flink は、SQL および DataStream ジョブの実行もサポートしています。これらのジョブの開発方法の詳細については、「ジョブ開発マップ」および「JAR ジョブ開発」をご参照ください。