すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:Python 依存関係の使用

最終更新日:Jan 08, 2025

Realtime Compute for Apache Flink の Python デプロイメントでは、カスタム Python 仮想環境、サードパーティ製 Python パッケージ、JAR パッケージ、およびデータファイルを使用できます。このトピックでは、Python デプロイメントでこれらの依存関係を使用する方法について説明します。

背景情報

以下のセクションの手順に基づいて、Python 依存関係を使用できます。

カスタム Python 仮想環境の使用

説明

Ververica Runtime(VVR)4.X では、Python 3.7 の仮想環境のみを使用できます。 VVR 6.X 以降では、それ以降のバージョンの Python の仮想環境を使用できます。

Python 仮想環境を構築できます。各 Python 仮想環境は、完全な Python ランタイム環境を提供します。仮想環境には、一連の Python 依存関係パッケージをインストールできます。次のセクションでは、Python 仮想環境を準備する方法について説明します。

  1. Python 仮想環境を準備します。

    1. ローカルデバイスで setup-pyflink-virtual-env.sh スクリプトを準備します。次のコードは、スクリプトの内容を示しています。

    2. set -e
      # Download the miniconda.sh script of Python 3.10. // Python 3.10 の miniconda.sh スクリプトをダウンロードします。
      wget "https://repo.continuum.io/miniconda/Miniconda3-py310_24.7.1-0-Linux-x86_64.sh" -O "miniconda.sh"
      
      # Add the execution permissions to the miniconda.sh script of Python 3.10. // Python 3.10 の miniconda.sh スクリプトに実行権限を追加します。
      chmod +x miniconda.sh
      
      # Create a Python virtual environment. // Python 仮想環境を作成します。
      ./miniconda.sh -b -p venv
      
      # Activate the Conda Python virtual environment. // Conda Python 仮想環境をアクティブ化します。
      source venv/bin/activate ""
      
      # Install the PyFlink dependency. 
      # update the PyFlink version if needed // PyFlink 依存関係をインストールします。必要に応じて PyFlink のバージョンを更新します。
      pip install "apache-flink==1.17.0"
      
      # Deactivate the Conda Python virtual environment. // Conda Python 仮想環境を非アクティブ化します。
      conda deactivate
      
      # Delete the cached packages. // キャッシュされたパッケージを削除します。
      rm -rf venv/pkgs
      
      # Package the prepared Conda Python virtual environment. // 準備した Conda Python 仮想環境をパッケージ化します。
      zip -r venv.zip venv
      説明

      このトピックでは、デプロイメントは VVR 8.X を使用し、Python 3.10 の仮想環境で実行されます。別の VVR バージョンを使用する場合、または別の Python バージョンの仮想環境をインストールする場合は、ビジネス要件に基づいてコード内の次の項目を変更する必要があります。

      • miniconda.sh スクリプトをダウンロードするための URL:URL を、目的のバージョンの miniconda.sh スクリプトをダウンロードするための URL に変更します。

      • apache-flink:このパラメーターの値を、デプロイメントの VVR バージョンに対応する Flink バージョンに変更します。 Flink バージョンの表示方法の詳細については、リファレンス を参照してください。

    3. ローカルデバイスで build.sh スクリプトを準備します。次のコードは、スクリプトの内容を示しています。

      #!/bin/bash
      set -e -x
      
      sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
      sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
      
      yum install -y zip wget
      
      cd /root/
      bash /build/setup-pyflink-virtual-env.sh
      mv venv.zip /build/
    4. CLI で、次のコマンドを実行して Python 仮想環境をインストールします。

      docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 ./build.sh

      コマンドを実行すると、venv.zip という名前のファイルが生成されます。この例では、Python 3.10 の仮想環境が使用されています。

      上記のスクリプトを変更して、必要なサードパーティ製 Python パッケージを仮想環境にインストールすることもできます。

  2. Python デプロイメントで Python 仮想環境を使用します。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. [Fully Managed Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションペインで、[アーティファクト] をクリックします。 [アーティファクト] ページで、[アーティファクトのアップロード] をクリックします。表示されるダイアログボックスで、venv.zip パッケージを選択します。

    4. [デプロイメント] ページで、目的のジョブの名前をクリックします。

    5. [構成] タブで、[基本] セクションの右上隅にある [編集] をクリックし、[python アーカイブ] ドロップダウンリストから venv.zip パッケージを選択します。

      デプロイメントが Python ユーザー定義関数(UDF)を使用する必要がある SQL デプロイメントである場合は、[パラメーター] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。

      python.archives: oss://.../venv.zip
    6. [パラメーター] セクションで、デプロイメントの VVR バージョンに基づいて、指定された Python 仮想環境をインストールするためのパスの構成情報を [その他の構成] フィールドに追加します。

      • VVR 6.X 以降

        python.executable: venv.zip/venv/bin/python
        python.client.executable: venv.zip/venv/bin/python
      • VVR 6.X より前のエンジンバージョン

        python.executable: venv.zip/venv/bin/python

サードパーティ製 Python パッケージの使用

説明

以下の説明にある Zip SafePyPI、および manylinux は、サードパーティの Web サイトで提供されています。これらの Web サイトにアクセスすると、アクセスに失敗したり、アクセスが遅延したりする可能性があります。

次の 2 つのシナリオは、サードパーティ製 Python パッケージを使用する方法を示しています。

  • 直接インポートできるサードパーティ製 Python パッケージを使用する

    サードパーティ製 Python パッケージが Zip Safe パッケージである場合は、次の手順を実行して、インストールせずに Python デプロイメントでパッケージを直接使用できます。

    1. 直接インポートできるサードパーティ製 Python パッケージをダウンロードします。

      1. Web ブラウザーで PyPI にアクセスします。

      2. 検索ボックスに、apache-flink1.12.2 などのサードパーティ製 Python パッケージの名前を入力します。

      3. 検索結果で、使用したいパッケージの名前をクリックします。

      4. 表示されるページの左側のナビゲーションペインで、[Download files] をクリックします。

      5. 名に cp37-cp37m-manylinux1 が含まれるパッケージの名前をクリックして、パッケージをダウンロードします。

    2. Realtime Compute for Apache Flink コンソール にログインします。

    3. [Fully Managed Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    4. 左側のナビゲーションペインで、[アーティファクト] をクリックします。 [アーティファクト] ページで、[アーティファクトのアップロード] をクリックします。表示されるダイアログボックスで、必要なサードパーティ製 Python パッケージを選択します。

    5. 左側のナビゲーションペインで、[デプロイメント] をクリックします。 [デプロイメント] ページで、[デプロイメントの作成] をクリックします。 [デプロイメントの作成] ダイアログボックスで、[デプロイメントタイプ][PYTHON] を選択し、Python ライブラリ にアップロードしたサードパーティ製 Python パッケージを選択します。

    6. [保存] をクリックします。

  • コンパイルが必要なサードパーティ製 Python パッケージを使用する

    サードパーティ製 Python パッケージが次の条件を満たす場合、パッケージを使用する前にコンパイルする必要があります。サードパーティ製 Python パッケージは、tar.gz 形式の圧縮パッケージ、または別の場所からダウンロードしたソースパッケージであり、圧縮パッケージのルートディレクトリの下に setup.py ファイルが存在します。Python デプロイメントでサードパーティ製 Python パッケージを呼び出す前に、Flink と互換性のある環境でサードパーティ製 Python パッケージをコンパイルする必要があります。

    Python 3.7quay.io/pypa/manylinux2014_x86_64 イメージで使用して、サードパーティ製 Python パッケージをコンパイルすることをお勧めします。イメージによって生成されたパッケージは、ほとんどの Linux オペレーティングシステムと互換性があります。イメージの詳細については、manylinux を参照してください。

    説明

    Python 3.7/opt/python/cp37-cp37m/bin/python3 ディレクトリにインストールされています。

    次の例は、サードパーティ製 Python パッケージ opencv-python-headless をコンパイルして使用する方法を示しています。

    1. サードパーティ製 Python パッケージをコンパイルします。

      1. ローカルデバイスで requirements.txt ファイルを準備します。次のコードは、ファイルの内容を示しています。

        opencv-python-headless
      2. ローカルデバイスで build.sh スクリプトを準備します。次のコードは、スクリプトの内容を示しています。

        #!/bin/bash
        set -e -x
        
        sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
        sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
        
        yum install -y zip
        
        PYBIN=/opt/python/cp37-cp37m/bin
        #PYBIN=/opt/python/cp38-cp38/bin
        #PYBIN=/opt/python/cp39-cp39/bin
        #PYBIN=/opt/python/cp310-cp310/bin
        
        "${PYBIN}/pip" install --target __pypackages__ -r requirements.txt
        cd __pypackages__ && zip -r deps.zip . && mv deps.zip ../ && cd ..
        rm -rf __pypackages__
      3. CLI で、次のコマンドを実行します。

        docker run -it --rm -v $PWD:/build  -w /build quay.io/pypa/manylinux2014_x86_64 /bin/bash build.sh

        コマンドを実行すると、deps.zip という名前のファイルが生成されます。このファイルは、コンパイルされたサードパーティ製 Python パッケージです。

        requirements.txt ファイルの内容を変更して、他の必要なサードパーティ製 Python パッケージをインストールすることもできます。また、requirements.txt ファイルには複数の Python 依存関係を指定できます。

    2. Python デプロイメントでサードパーティ製 Python パッケージ deps.zip を使用します。

      1. Realtime Compute for Apache Flink コンソール にログインします。

      2. [Fully Managed Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

      3. 左側のナビゲーションペインで、[アーティファクト] をクリックします。 [アーティファクト] ページで、[アーティファクトのアップロード] をクリックします。表示されるダイアログボックスで、deps.zip を選択します。

      4. [デプロイメント] ページで、目的のデプロイメントの名前をクリックします。[構成] タブで、[基本] セクションの右上隅にある [編集] をクリックし、[python ライブラリ] ドロップダウンリストから deps.zip パッケージを選択します。

    1. [保存] をクリックします。

JAR パッケージの使用

Python デプロイメントでコネクタや Java UDF などの Java クラスを使用する場合は、次の操作を実行して、コネクタまたは Java UDF の JAR パッケージを指定できます。

  1. Realtime Compute for Apache Flink コンソール にログインします。

  2. [Fully Managed Flink] タブで、管理するワークスペースを見つけ、コンソール[アクション] 列の をクリックします。

  3. 左側のナビゲーションペインで、[アーティファクト] をクリックします。 [アーティファクト] ページで、[アーティファクトのアップロード] をクリックします。表示されるダイアログボックスで、使用する JAR パッケージを選択します。

  4. [デプロイメント] ページで、目的のデプロイメントの名前をクリックします。[構成] タブで、編集[基本] セクションの右上隅にある 追加の依存関係 をクリックし、 ドロップダウンリストから必要な JAR パッケージを選択します。

  5. [デプロイメント] ページで、目的のデプロイメントの名前をクリックします。 [構成] タブで、[パラメーター] セクションの右上隅にある [編集] をクリックし、[その他の構成] フィールドに次の構成を追加します。

    たとえば、ドラフトが jar1.jarjar2.jar という名前の 2 つの JAR パッケージに依存している場合は、次の構成情報を追加します。

    pipeline.classpaths: 'file:///flink/usrlib/jar1.jar;file:///flink/usrlib/jar2.jar'
  6. [保存] をクリックします。

データファイルの使用

説明

フルマネージド Flink では、データファイルをアップロードして Python デプロイメントをデバッグすることはできません。

次のシナリオは、データファイルを使用する方法を示しています。

  • [Python アーカイブ] ドロップダウンリストからパッケージを選択する

    多数のデータファイルがある場合は、データファイルを ZIP ファイルにパッケージ化し、次の操作を実行して Python デプロイメントで使用できます。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. [Fully Managed Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションペインで、[アーティファクト] をクリックします。 [アーティファクト] ページで、[アーティファクトのアップロード] をクリックします。表示されるダイアログボックスで、目的のデータファイルの ZIP パッケージを選択します。

    4. [デプロイメント] ページで、目的のデプロイメントの名前をクリックします。[構成] タブで、編集[基本] セクションの右上隅にある をクリックします。 [Python アーカイブ] ドロップダウンリストから必要な ZIP パッケージを選択します。Python アーカイブ

    5. Python UDF で、次のコマンドを実行してデータファイルにアクセスします。この例では、データファイルを含むパッケージの名前は mydata.zip です。

      def map():
          with open("mydata.zip/mydata/data.txt") as f:
          ...
  • [追加の依存関係] ドロップダウンリストからデータファイルを選択する

    少数のデータファイルがある場合は、次の操作を実行して、Python デプロイメントでこれらのファイルにアクセスできます。

    1. Realtime Compute for Apache Flink コンソール にログインします。

    2. [Fully Managed Flink] タブで、管理するワークスペースを見つけ、[アクション] 列の [コンソール] をクリックします。

    3. 左側のナビゲーションペインで、[アーティファクト] をクリックします。 [アーティファクト] ページで、[アーティファクトのアップロード] をクリックします。表示されるダイアログボックスで、目的のデータファイルを選択します。

    4. [デプロイメント] ページで、目的のデプロイメントの名前をクリックします。[構成] タブで、編集[基本] セクションの右上隅にある をクリックします。 追加の依存関係 ドロップダウンリストから必要なデータファイルを選択します。

    5. Python UDF で、次のコマンドを実行してデータファイルにアクセスします。この例では、データファイルの名前は data.txt です。

      def map():
          with open("/flink/usrlib/data.txt") as f:
          ...

リファレンス

  • Python API ドラフトの開発方法の詳細については、「Python API ドラフトの開発」を参照してください。

  • Realtime Compute for Apache Flink の Python デプロイメントの開発方法の詳細については、「Python デプロイメントの概要」を参照してください。

  • フルマネージド Flink は、SQL ドラフトと DataStream ドラフトをサポートしています。 SQL ドラフトと DataStream ドラフトの開発方法の詳細については、「SQL ドラフトの開発」および「JAR ドラフトの開発」を参照してください。