PySpark タスクでは、データ処理と分析機能を強化するために、サードパーティの Python ライブラリが必要になることがよくあります。このトピックでは、ランタイム環境、Conda 環境の分離、PEX 軽量パッケージングメソッドを通じて、これらのライブラリを Serverless Spark 環境に効果的に統合する方法の詳細な例を示し、分散コンピューティングシナリオにおけるタスクの安定性と柔軟性を確保します。
背景情報
インタラクティブな PySpark 開発中に、サードパーティの Python ライブラリを使用して、データ処理と分析の柔軟性と使いやすさを向上させることができます。次の 3 つの方法でこの目標を達成できます。実際のニーズに基づいて最適な方法を選択することをお勧めします。
方法 | シナリオ |
Alibaba Cloud 管理コンソールで、必要なライブラリ (たとえば、 | |
Conda は、クロスプラットフォームのパッケージおよび環境管理システムです。Conda を使用すると、異なる Python バージョンとライブラリ依存関係を持つ環境を簡単に作成、保存、ロード、および切り替えることができます。 | |
PEX (Python EXecutable) は、Python アプリケーションと対応する依存関係を実行可能ファイルにパッケージ化するために使用できるツールです。 |
前提条件
ワークスペースが作成されていること。詳細については、「ワークスペースの作成」をご参照ください。
制限事項
Python 3.8 以降をインストールする必要があります。この例では、Python 3.8 を使用します。
手順
方法 1: ランタイム環境を通じてサードパーティの Python ライブラリを使用する
ステップ 1: ランタイム環境の作成
ランタイム環境管理ページに移動します。
E-MapReduce コンソールにログインします。
左側のナビゲーションウィンドウで、 を選択します。
[Spark] ページで、対象のワークスペースの名前をクリックします。
[EMR Serverless Spark] ページで、左側のナビゲーションウィンドウにある [ランタイム環境管理] をクリックします。
[ランタイム環境の作成] をクリックします。
[ランタイム環境の作成] ページで、[ライブラリの追加] をクリックします。
パラメーターの詳細については、「ランタイム環境の管理」をご参照ください。
[新しいライブラリ] ダイアログボックスで、[PyPI] ソースタイプを使用し、[PyPI パッケージ] パラメーターを設定してから、[OK] をクリックします。
[PyPI パッケージ] フィールドに、ライブラリの名前とバージョンを入力します。バージョンを指定しない場合、デフォルトで最新バージョンがインストールされます。
この例では、ライブラリ
fakerとgeopyが追加されます。[作成] をクリックします。
ランタイム環境が作成されると、システムは環境の初期化を開始します。
ステップ 2: リソースファイルを OSS にアップロードする
pyspark_third_party_libs_demo.py をクリックして、必要なリソースファイルをダウンロードします。
この例では、PySpark とサードパーティのライブラリを使用してアナログデータを生成し、地理分析を実行する方法を示します。
fakerライブラリは、ユーザー情報とランダムな地理的位置を含むアナログデータを生成するために使用されます。geopyライブラリは、各ユーザーの場所とエッフェル塔の間の地理的距離を計算するために使用されます。最後に、10 キロメートル以内のユーザーがフィルターで除外されます。次の内容で
pyspark_third_party_libs_demo.pyサンプルスクリプトを作成することもできます。pyspark_third_party_libs_demo.pyを OSS にアップロードします。詳細については、「単純アップロード」をご参照ください。
ステップ 3: ジョブを開発して実行する
EMR Serverless Spark ページの左側のナビゲーションウィンドウで、[データ開発] をクリックします。
[開発] タブで、
アイコンをクリックします。[作成] ダイアログボックスで、名前を入力し、[タイプ] ドロップダウンリストから を選択して、[OK] をクリックします。
右上隅で、キューを選択します。
ジョブの設定タブで、次のパラメーターを設定し、[実行] をクリックします。他のパラメーターを設定する必要はありません。
パラメーター
説明
メイン Python リソース
[OSS] を選択し、
pyspark_third_party_libs_demo.pyファイルの OSS パスを入力します。例: oss://<yourBucketName>/pyspark_third_party_libs_demo.py。ランタイム環境
ドロップダウンリストから作成したランタイム環境を選択します。
ジョブの実行後、[実行記録] セクションのジョブの [アクション] 列で [ログエクスプローラー] をクリックします。
[ログエクスプローラー] タブで、関連するログを表示できます。
たとえば、[ドライバーログ] セクションの [Stdout] タブで、次の情報を表示できます。
Generated sample data: +--------------------+-------------------+------------------+------------------+ | user_id| name| latitude| longitude| +--------------------+-------------------+------------------+------------------+ |73d4565c-8cdf-4bc...| Garrett Robertson| 48.81845614776422|2.4087517234236064| |0fc364b1-6759-416...| Dawn Gonzalez| 48.68654896170054|2.4708555780468013| |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246| |1cabbdde-e703-4a8...| David Morris|48.656356532418116|2.2503952330408175| |8b7938a0-b283-401...| Shannon Perkins| 48.82915001905855| 2.410743969589327| +--------------------+-------------------+------------------+------------------+ only showing top 5 rows Found 24 users within a 10-kilometer range: +-----------------+------------------+------------------+-----------+ | name| latitude| longitude|distance_km| +-----------------+------------------+------------------+-----------+ |Garrett Robertson| 48.81845614776422|2.4087517234236064| 9.490705| | Shannon Perkins| 48.82915001905855| 2.410743969589327| 9.131355| | Alex Harris| 48.82547383207313|2.3579336032430027| 5.923493| | Tammy Ramos| 48.84668267431606|2.3606455536493574| 5.026109| | Ivan Christian| 48.89224239228342|2.2811025348668195| 3.8897192| | Vernon Humphrey| 48.93142188723839| 2.306957802222233| 8.171813| | Shawn Rodriguez|48.919907710882654|2.2270993307836044| 8.439087| | Robert Fisher|48.794216103154646|2.3699024070507906| 9.033209| | Heather Collier|48.822957591865205|2.2993033803043454| 3.957171| | Dawn White|48.877816307255586|2.3743880390928878| 6.246059| +-----------------+------------------+------------------+-----------+ only showing top 10 rows
方法 2: Conda を通じて Python 環境を管理する
ステップ 1: Conda 環境を作成してデプロイする
Alibaba Cloud Linux 3 OS を使用し、インターネットに接続され、x86 アーキテクチャを持つ Elastic Compute Service (ECS) インスタンスを作成します。詳細については、「カスタム起動タブでインスタンスを作成する」をご参照ください。
説明EMR on ECS ページで作成された既存の EMR クラスターのアイドルノードを使用することもできます (ノードが x86 アーキテクチャであることを確認してください)。
次のコマンドを実行して Miniconda をインストールします。
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh chmod +x Miniconda3-latest-Linux-x86_64.sh ./Miniconda3-latest-Linux-x86_64.sh -b source miniconda3/bin/activatePython 3.8 と NumPy を使用する Conda 環境を構築します。
conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8 conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz
ステップ 2: リソースファイルを OSS にアップロードする
kmeans.py と kmeans_data.txt をクリックして、必要なリソースファイルをダウンロードします。
次の内容で
kmeans.pyサンプルスクリプトとkmeans_data.txtデータファイルを作成することもできます。pyspark_conda_env.tar.gz、kmeans.py、およびkmeans_data.txtファイルを OSS にアップロードします。詳細については、「単純アップロード」をご参照ください。
ステップ 3: ジョブを開発して実行する
EMR Serverless Spark ページの左側のナビゲーションウィンドウで、[データ開発] をクリックします。
[開発] タブで、
アイコンをクリックします。[作成] ダイアログボックスで、名前を入力し、[タイプ] ドロップダウンリストから を選択して、[OK] をクリックします。
右上隅で、キューを選択します。
ジョブの設定タブで、次のパラメーターを設定し、[実行] をクリックします。他のパラメーターを設定する必要はありません。
パラメーター
説明
メイン Python リソース
[OSS] を選択し、
kmeans.pyファイルの OSS パスを入力します。例: oss://<yourBucketName>/kmeans.py。実行パラメーター
kmeans_data.txtデータファイルの OSS パスを入力します。フォーマット:
oss://<yourBucketName>/kmeans_data.txt 2。アーカイブのリソース
[OSS] を選択し、
pyspark_conda_env.tar.gzファイルの OSS パスを入力します。フォーマット:
oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv。Spark 構成
spark.pyspark.driver.python ./condaenv/bin/python spark.pyspark.python ./condaenv/bin/pythonジョブの実行後、[実行記録] セクションのジョブの [アクション] 列で [ログエクスプローラー] をクリックします。
[ログエクスプローラー] タブで、関連するログを表示できます。
たとえば、[ドライバーログ] セクションの [Stdout] タブで、次の情報を表示できます。
Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])] Total Cost: 0.11999999999999958
方法 3: PEX を通じて Python 依存関係をパッケージ化する
ステップ 1: PEX ファイルをパッケージ化して実行する
Alibaba Cloud Linux 3 OS を使用し、インターネットに接続され、x86 アーキテクチャを持つ Elastic Compute Service (ECS) インスタンスを作成します。詳細については、「カスタム起動タブでインスタンスを作成する」をご参照ください。
説明EMR on ECS ページで作成された既存の EMR クラスターのアイドルノードを使用することもできます (ノードが x86 アーキテクチャであることを確認してください)。
PEX と wheel ツールをインストールします。
pip3.8 install --user pex wheel \ --trusted-host mirrors.cloud.aliyuncs.com \ -i http://mirrors.cloud.aliyuncs.com/pypi/simple/使用したいサードパーティの Python ライブラリの wheel ファイルを一時ディレクトリにダウンロードします。
pip3.8 wheel -w /tmp/wheel \ pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \ --trusted-host mirrors.cloud.aliyuncs.com \ -i http://mirrors.cloud.aliyuncs.com/pypi/simple/PEX ファイルを生成します。
pex -f /tmp/wheel --no-index \ pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \ -o spark331_pandas153.pex
ステップ 2: PEX ファイルを OSS にアップロードする
kmeans.py と kmeans_data.txt をクリックして、必要なリソースファイルをダウンロードします。
spark331_pandas153.pex、kmeans.py、およびkmeans_data.txtファイルを OSS にアップロードします。詳細については、「単純アップロード」をご参照ください。説明この例では、Spark 3.3.1 が使用され、PEX ファイルには Pandas、PyArrow、および NumPy のサードパーティ Python ライブラリが含まれています。選択した Spark バージョンに基づいて、他のバージョンの PySpark 環境をパッケージ化できます。エンジンバージョンの詳細については、「エンジンバージョン」をご参照ください。
ステップ 3: ジョブを開発して実行する
EMR Serverless Spark ページの左側のナビゲーションウィンドウで、[データ開発] をクリックします。
[開発] タブで、
アイコンをクリックします。[作成] ダイアログボックスで、名前を入力し、[タイプ] ドロップダウンリストから を選択して、[OK] をクリックします。
右上隅で、キューを選択します。
ジョブの設定タブで、次のパラメーターを設定し、[実行] をクリックします。他のパラメーターを設定する必要はありません。
パラメーター
説明
メイン Python リソース
[OSS] を選択し、
kmeans.pyファイルの OSS パスを入力します。例: oss://<yourBucketName>/kmeans.py。実行パラメーター
kmeans_data.txtデータファイルの OSS パスを入力します。フォーマット:
oss://<yourBucketName>/kmeans_data.txt 2。ファイルリソース
[OSS] を選択し、
spark331_pandas153.pexファイルの OSS パスを入力します。例: oss://<yourBucketName>/spark331_pandas153.pex。Spark 構成
spark.pyspark.driver.python ./spark331_pandas153.pex spark.pyspark.python ./spark331_pandas153.pexジョブの実行後、[実行記録] セクションのジョブの [アクション] 列で [ログエクスプローラー] をクリックします。
[ログエクスプローラー] タブで、関連するログを表示できます。
たとえば、[ドライバーログ] セクションの [Stdout] タブで、次の情報を表示できます。
Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])] Total Cost: 0.11999999999999958
参考
このトピックでは、PySpark 開発を例として使用します。他の方法でジョブを開発する場合は、「バッチジョブまたはストリーミングジョブを開発する」をご参照ください。