PySpark tasks often require third-party Python libraries to enhance data processing and analysis capabilities. This topic provides detailed examples of how to effectively integrate these libraries into the Serverless Spark environment through runtime environments, Conda environment isolation, and PEX lightweight packaging methods, ensuring task stability and flexibility in distributed computing scenarios.
Background
During interactive PySpark development, you can use third-party Python libraries to improve the flexibility and ease of use of data processing and analysis. The following three methods can help you achieve this goal. You are advised to choose the most suitable method based on your actual needs.
Method | Scenarios |
Method 1: Use third-party Python libraries through runtime environments | Configure standardized environments with required libraries (such as |
Conda is a cross-platform package and environment management system. You can use Conda to easily create, save, load, and switch between environments that have different Python versions and library dependencies. | |
PEX (Python EXecutable) is a tool that you can use to package a Python application and the corresponding dependencies into an executable file. |
Prerequisites
A workspace is created. For more information, see Create a workspace.
Limitations
You must install Python 3.8 or later. In this example, Python 3.8 is used.
Procedure
Method 1: Use third-party Python libraries through runtime environments
Step 1: Create a runtime environment
Go to the Runtime Environment Management page.
Log on to the E-MapReduce console.
In the navigation pane on the left, choose .
On the Spark page, click the name of the target workspace.
On the EMR Serverless Spark page, click Environment in the navigation pane on the left.
Click Create Environment.
On the Create Environment page, click Add Library.
For more information about parameters, see Manage runtime environments.
In the New Library dialog box, use the PyPI source type, configure the PyPI Package parameter, and then click OK.
In the PyPI Package field, enter the name and version of the library. If you do not specify a version, the latest version is installed by default.
In this example, the libraries
fakerandgeopyare added.Click Create.
After the runtime environment is created, the system starts to initialize the environment.
Step 2: Upload resource files to OSS
Click pyspark_third_party_libs_demo.py to download the required resource file.
This example demonstrates how to use PySpark and third-party libraries to generate analog data and perform geographic analysis. The
fakerlibrary is used to generate analog data that contains user information and random geographic locations. Thegeopylibrary is used to calculate the geographic distance between each user's location and the Eiffel Tower. Finally, users within a 10-kilometer range are filtered out.You can also create the
pyspark_third_party_libs_demo.pysample script with the following content:Upload
pyspark_third_party_libs_demo.pyto OSS. For more information, see Simple upload.
Step 3: Develop and run a job
In the navigation pane on the left of the EMR Serverless Spark page, click Development.
On the Development tab, click the
icon.In the New dialog box, enter a name, select from the Type drop-down list, and then click OK.
In the upper-right corner, select a queue.
On the configuration tab of the job, configure the following parameters and click Run. You do not need to configure other parameters.
Parameter
Description
Main Python Resources
Select OSS and enter the OSS path of the
pyspark_third_party_libs_demo.pyfile. Example: oss://<yourBucketName>/pyspark_third_party_libs_demo.py.Environment
Select the runtime environment that you created from the drop-down list.
After the job is run, click Logs in the Actions column of the job in the Execution Records section.
On the Log Exploration tab, you can view the related logs.
For example, on the Stdout tab of the Driver Log section, you can view the following information:
Generated sample data: +--------------------+-------------------+------------------+------------------+ | user_id| name| latitude| longitude| +--------------------+-------------------+------------------+------------------+ |73d4565c-8cdf-4bc...| Garrett Robertson| 48.81845614776422|2.4087517234236064| |0fc364b1-6759-416...| Dawn Gonzalez| 48.68654896170054|2.4708555780468013| |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246| |1cabbdde-e703-4a8...| David Morris|48.656356532418116|2.2503952330408175| |8b7938a0-b283-401...| Shannon Perkins| 48.82915001905855| 2.410743969589327| +--------------------+-------------------+------------------+------------------+ only showing top 5 rows Found 24 users within a 10-kilometer range: +-----------------+------------------+------------------+-----------+ | name| latitude| longitude|distance_km| +-----------------+------------------+------------------+-----------+ |Garrett Robertson| 48.81845614776422|2.4087517234236064| 9.490705| | Shannon Perkins| 48.82915001905855| 2.410743969589327| 9.131355| | Alex Harris| 48.82547383207313|2.3579336032430027| 5.923493| | Tammy Ramos| 48.84668267431606|2.3606455536493574| 5.026109| | Ivan Christian| 48.89224239228342|2.2811025348668195| 3.8897192| | Vernon Humphrey| 48.93142188723839| 2.306957802222233| 8.171813| | Shawn Rodriguez|48.919907710882654|2.2270993307836044| 8.439087| | Robert Fisher|48.794216103154646|2.3699024070507906| 9.033209| | Heather Collier|48.822957591865205|2.2993033803043454| 3.957171| | Dawn White|48.877816307255586|2.3743880390928878| 6.246059| +-----------------+------------------+------------------+-----------+ only showing top 10 rows
Method 2: Manage Python environments through Conda
Step 1: Create and deploy a Conda environment
Create an Elastic Compute Service (ECS) instance that uses the Alibaba Cloud Linux 3 OS, is connected to the Internet, and has an x86 architecture. For more information, see Create an instance on the Custom Launch tab.
NoteYou can also use an idle node in an existing EMR cluster that is created on the EMR on ECS page (make sure the node has an x86 architecture).
Run the following commands to install Miniconda:
wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh chmod +x Miniconda3-latest-Linux-x86_64.sh ./Miniconda3-latest-Linux-x86_64.sh -b source miniconda3/bin/activateBuild a Conda environment that uses Python 3.8 and NumPy.
conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8 conda activate pyspark_conda_env conda pack -f -o pyspark_conda_env.tar.gz
Step 2: Upload resource files to OSS
Click kmeans.py and kmeans_data.txt to download the required resource files.
You can also create the
kmeans.pysample script and thekmeans_data.txtdata file with the following content:Upload the
pyspark_conda_env.tar.gz,kmeans.py, andkmeans_data.txtfiles to OSS. For more information, see Simple upload.
Step 3: Develop and run a job
In the navigation pane on the left of the EMR Serverless Spark page, click Development.
On the Development tab, click the
icon.In the New dialog box, enter a name, select from the Type drop-down list, and then click OK.
In the upper-right corner, select a queue.
On the configuration tab of the job, configure the following parameters and click Run. You do not need to configure other parameters.
Parameter
Description
Main Python Resources
Select OSS and enter the OSS path of the
kmeans.pyfile. Example: oss://<yourBucketName>/kmeans.py.Execution Parameters
Enter the OSS path of the
kmeans_data.txtdata file.Format:
oss://<yourBucketName>/kmeans_data.txt 2.Archive Resources
Select OSS and enter the OSS path of the
pyspark_conda_env.tar.gzfile.Format:
oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv.Spark Configuration
spark.pyspark.driver.python ./condaenv/bin/python spark.pyspark.python ./condaenv/bin/pythonAfter the job is run, click Logs in the Actions column of the job in the Execution Records section.
On the Log Exploration tab, you can view the related logs.
For example, on the Stdout tab of the Driver Log section, you can view the following information:
Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])] Total Cost: 0.11999999999999958
Method 3: Package Python dependencies through PEX
Step 1: Package and execute a PEX file
Create an Elastic Compute Service (ECS) instance that uses the Alibaba Cloud Linux 3 OS, is connected to the Internet, and has an x86 architecture. For more information, see Create an instance on the Custom Launch tab.
NoteYou can also use an idle node in an existing EMR cluster that is created on the EMR on ECS page (make sure the node has an x86 architecture).
Install the PEX and wheel tools.
pip3.8 install --user pex wheel \ --trusted-host mirrors.cloud.aliyuncs.com \ -i http://mirrors.cloud.aliyuncs.com/pypi/simple/Download the wheel files of the third-party Python libraries that you want to use to a temporary directory.
pip3.8 wheel -w /tmp/wheel \ pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \ --trusted-host mirrors.cloud.aliyuncs.com \ -i http://mirrors.cloud.aliyuncs.com/pypi/simple/Generate a PEX file.
pex -f /tmp/wheel --no-index \ pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \ -o spark331_pandas153.pex
Step 2: Upload the PEX file to OSS
Click kmeans.py and kmeans_data.txt to download the required resource files.
Upload the
spark331_pandas153.pex,kmeans.py, andkmeans_data.txtfiles to OSS. For more information, see Simple upload.NoteIn this example, Spark 3.3.1 is used and the PEX file contains the Pandas, PyArrow, and NumPy third-party Python libraries. You can package PySpark environments of other versions based on the Spark version that you select. For more information about engine versions, see Engine versions.
Step 3: Develop and run a job
In the navigation pane on the left of the EMR Serverless Spark page, click Development.
On the Development tab, click the
icon.In the New dialog box, enter a name, select from the Type drop-down list, and then click OK.
In the upper-right corner, select a queue.
On the configuration tab of the job, configure the following parameters and click Run. You do not need to configure other parameters.
Parameter
Description
Main Python Resources
Select OSS and enter the OSS path of the
kmeans.pyfile. Example: oss://<yourBucketName>/kmeans.py.Execution Parameters
Enter the OSS path of the
kmeans_data.txtdata file.Format:
oss://<yourBucketName>/kmeans_data.txt 2.File Resources
Select OSS and enter the OSS path of the
spark331_pandas153.pexfile. Example: oss://<yourBucketName>/spark331_pandas153.pex.Spark Configuration
spark.pyspark.driver.python ./spark331_pandas153.pex spark.pyspark.python ./spark331_pandas153.pexAfter the job is run, click Logs in the Actions column of the job in the Execution Records section.
On the Log Exploration tab, you can view the related logs.
For example, on the Stdout tab of the Driver Log section, you can view the following information:
Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])] Total Cost: 0.11999999999999958
References
This topic uses PySpark development as an example. If you want to develop jobs in other ways, see Develop a batch or streaming job.