All Products
Search
Document Center

E-MapReduce:Use third-party Python libraries in a PySpark job

Last Updated:Dec 01, 2025

PySpark tasks often require third-party Python libraries to enhance data processing and analysis capabilities. This topic provides detailed examples of how to effectively integrate these libraries into the Serverless Spark environment through runtime environments, Conda environment isolation, and PEX lightweight packaging methods, ensuring task stability and flexibility in distributed computing scenarios.

Background

During interactive PySpark development, you can use third-party Python libraries to improve the flexibility and ease of use of data processing and analysis. The following three methods can help you achieve this goal. You are advised to choose the most suitable method based on your actual needs.

Method

Scenarios

Method 1: Use third-party Python libraries through runtime environments

Configure standardized environments with required libraries (such as numpy, pandas) in the Alibaba Cloud Management Console. The system automatically builds the environment, and you can use the created runtime environment when adding new tasks.

Method 2: Manage Python environments through Conda

Conda is a cross-platform package and environment management system. You can use Conda to easily create, save, load, and switch between environments that have different Python versions and library dependencies.

Method 3: Package Python dependencies through PEX

PEX (Python EXecutable) is a tool that you can use to package a Python application and the corresponding dependencies into an executable file.

Prerequisites

A workspace is created. For more information, see Create a workspace.

Limitations

You must install Python 3.8 or later. In this example, Python 3.8 is used.

Procedure

Method 1: Use third-party Python libraries through runtime environments

Step 1: Create a runtime environment

  1. Go to the Runtime Environment Management page.

    1. Log on to the E-MapReduce console.

    2. In the navigation pane on the left, choose EMR Serverless > Spark.

    3. On the Spark page, click the name of the target workspace.

    4. On the EMR Serverless Spark page, click Environment in the navigation pane on the left.

  2. Click Create Environment.

  3. On the Create Environment page, click Add Library.

    For more information about parameters, see Manage runtime environments.

  4. In the New Library dialog box, use the PyPI source type, configure the PyPI Package parameter, and then click OK.

    In the PyPI Package field, enter the name and version of the library. If you do not specify a version, the latest version is installed by default.

    In this example, the libraries faker and geopy are added.

  5. Click Create.

    After the runtime environment is created, the system starts to initialize the environment.

Step 2: Upload resource files to OSS

  1. Click pyspark_third_party_libs_demo.py to download the required resource file.

    This example demonstrates how to use PySpark and third-party libraries to generate analog data and perform geographic analysis. The faker library is used to generate analog data that contains user information and random geographic locations. The geopy library is used to calculate the geographic distance between each user's location and the Eiffel Tower. Finally, users within a 10-kilometer range are filtered out.

    You can also create the pyspark_third_party_libs_demo.py sample script with the following content:

    pyspark_third_party_libs_demo.py

    from pyspark.sql import SparkSession   
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    from faker import Faker
    import random
    from geopy.distance import geodesic
    
    spark = SparkSession.builder \
            .appName("PySparkThirdPartyLibsDemo") \
            .getOrCreate()
    
    # Use the third-party faker library to generate analog data
    fake = Faker()
    landmark = (48.8584, 2.2945)  # Eiffel Tower coordinates
    
    # Create a function to generate analog data
    def generate_fake_data(num_records):
        data = []
        for _ in range(num_records):
            # Generate random coordinates near Paris
            lat = 48.85 + random.uniform(-0.2, 0.2)
            lon = 2.30 + random.uniform(-0.2, 0.2)
            data.append((
                fake.uuid4(),        # User ID
                fake.name(),         # Name
                lat,                 # Latitude
                lon                  # Longitude
            ))
        return data
    
    # Generate 100 analog records
    fake_data = generate_fake_data(100)
    
    # Create a Spark DataFrame
    columns = ["user_id", "name", "latitude", "longitude"]
    df = spark.createDataFrame(fake_data, schema=columns)
    
    # Print the generated sample data
    print("Generated sample data:")
    df.show(5)
    
    # Use the third-party geopy library to calculate distance
    def calculate_distance(lat, lon, landmark=landmark):
        """Calculate the geographic distance between two points (kilometers)"""
        user_location = (lat, lon)
        return geodesic(user_location, landmark).kilometers
    
    # Register a UDF (User Defined Function)
    distance_udf = udf(calculate_distance, FloatType())
    
    # Add a distance column
    df_with_distance = df.withColumn(
        "distance_km", 
        distance_udf("latitude", "longitude")
    )
    
    # Find users within a 10-kilometer range
    nearby_users = df_with_distance.filter("distance_km <= 10")
    
    # Print the results
    print(f"\nFound {nearby_users.count()} users within a 10-kilometer range:")
    nearby_users.select("name", "latitude", "longitude", "distance_km").show(10)
    
  2. Upload pyspark_third_party_libs_demo.py to OSS. For more information, see Simple upload.

Step 3: Develop and run a job

  1. In the navigation pane on the left of the EMR Serverless Spark page, click Development.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, select Application(Batch) > PySpark from the Type drop-down list, and then click OK.

  4. In the upper-right corner, select a queue.

  5. On the configuration tab of the job, configure the following parameters and click Run. You do not need to configure other parameters.

    Parameter

    Description

    Main Python Resources

    Select OSS and enter the OSS path of the pyspark_third_party_libs_demo.py file. Example: oss://<yourBucketName>/pyspark_third_party_libs_demo.py.

    Environment

    Select the runtime environment that you created from the drop-down list.

  6. After the job is run, click Logs in the Actions column of the job in the Execution Records section.

  7. On the Log Exploration tab, you can view the related logs.

    For example, on the Stdout tab of the Driver Log section, you can view the following information:

    Generated sample data:
    +--------------------+-------------------+------------------+------------------+
    |             user_id|               name|          latitude|         longitude|
    +--------------------+-------------------+------------------+------------------+
    |73d4565c-8cdf-4bc...|  Garrett Robertson| 48.81845614776422|2.4087517234236064|
    |0fc364b1-6759-416...|      Dawn Gonzalez| 48.68654896170054|2.4708555780468013|
    |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246|
    |1cabbdde-e703-4a8...|       David Morris|48.656356532418116|2.2503952330408175|
    |8b7938a0-b283-401...|    Shannon Perkins| 48.82915001905855| 2.410743969589327|
    +--------------------+-------------------+------------------+------------------+
    only showing top 5 rows
    
    
    Found 24 users within a 10-kilometer range:
    +-----------------+------------------+------------------+-----------+
    |             name|          latitude|         longitude|distance_km|
    +-----------------+------------------+------------------+-----------+
    |Garrett Robertson| 48.81845614776422|2.4087517234236064|   9.490705|
    |  Shannon Perkins| 48.82915001905855| 2.410743969589327|   9.131355|
    |      Alex Harris| 48.82547383207313|2.3579336032430027|   5.923493|
    |      Tammy Ramos| 48.84668267431606|2.3606455536493574|   5.026109|
    |   Ivan Christian| 48.89224239228342|2.2811025348668195|  3.8897192|
    |  Vernon Humphrey| 48.93142188723839| 2.306957802222233|   8.171813|
    |  Shawn Rodriguez|48.919907710882654|2.2270993307836044|   8.439087|
    |    Robert Fisher|48.794216103154646|2.3699024070507906|   9.033209|
    |  Heather Collier|48.822957591865205|2.2993033803043454|   3.957171|
    |       Dawn White|48.877816307255586|2.3743880390928878|   6.246059|
    +-----------------+------------------+------------------+-----------+
    only showing top 10 rows

Method 2: Manage Python environments through Conda

Step 1: Create and deploy a Conda environment

  1. Create an Elastic Compute Service (ECS) instance that uses the Alibaba Cloud Linux 3 OS, is connected to the Internet, and has an x86 architecture. For more information, see Create an instance on the Custom Launch tab.

    Note

    You can also use an idle node in an existing EMR cluster that is created on the EMR on ECS page (make sure the node has an x86 architecture).

  2. Run the following commands to install Miniconda:

    wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
    chmod +x Miniconda3-latest-Linux-x86_64.sh
    
    ./Miniconda3-latest-Linux-x86_64.sh -b
    source miniconda3/bin/activate
  3. Build a Conda environment that uses Python 3.8 and NumPy.

    conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz

Step 2: Upload resource files to OSS

  1. Click kmeans.py and kmeans_data.txt to download the required resource files.

    You can also create the kmeans.py sample script and the kmeans_data.txt data file with the following content:

    kmeans.py

    """
    A K-means clustering program using MLlib.
    
    This example requires NumPy (http://www.numpy.org/).
    """
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    
    
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kmeans <file> <k>", file=sys.stderr)
            sys.exit(-1)
        sc = SparkContext(appName="KMeans")
        lines = sc.textFile(sys.argv[1])
        data = lines.map(parseVector)
        k = int(sys.argv[2])
        model = KMeans.train(data, k)
        print("Final centers: " + str(model.clusterCenters))
        print("Total Cost: " + str(model.computeCost(data)))
        sc.stop()

    kmeans_data.txt

    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2
  2. Upload the pyspark_conda_env.tar.gz, kmeans.py, and kmeans_data.txt files to OSS. For more information, see Simple upload.

Step 3: Develop and run a job

  1. In the navigation pane on the left of the EMR Serverless Spark page, click Development.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, select Application(Batch) > PySpark from the Type drop-down list, and then click OK.

  4. In the upper-right corner, select a queue.

  5. On the configuration tab of the job, configure the following parameters and click Run. You do not need to configure other parameters.

    Parameter

    Description

    Main Python Resources

    Select OSS and enter the OSS path of the kmeans.py file. Example: oss://<yourBucketName>/kmeans.py.

    Execution Parameters

    Enter the OSS path of the kmeans_data.txt data file.

    Format: oss://<yourBucketName>/kmeans_data.txt 2.

    Archive Resources

    Select OSS and enter the OSS path of the pyspark_conda_env.tar.gz file.

    Format: oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv.

    Spark Configuration

    spark.pyspark.driver.python  ./condaenv/bin/python
    spark.pyspark.python         ./condaenv/bin/python
  6. After the job is run, click Logs in the Actions column of the job in the Execution Records section.

  7. On the Log Exploration tab, you can view the related logs.

    For example, on the Stdout tab of the Driver Log section, you can view the following information:

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

Method 3: Package Python dependencies through PEX

Step 1: Package and execute a PEX file

  1. Create an Elastic Compute Service (ECS) instance that uses the Alibaba Cloud Linux 3 OS, is connected to the Internet, and has an x86 architecture. For more information, see Create an instance on the Custom Launch tab.

    Note

    You can also use an idle node in an existing EMR cluster that is created on the EMR on ECS page (make sure the node has an x86 architecture).

  2. Install the PEX and wheel tools.

    pip3.8 install --user pex wheel \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  3. Download the wheel files of the third-party Python libraries that you want to use to a temporary directory.

    pip3.8 wheel -w /tmp/wheel \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  4. Generate a PEX file.

    pex -f /tmp/wheel --no-index \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      -o spark331_pandas153.pex

Step 2: Upload the PEX file to OSS

  1. Click kmeans.py and kmeans_data.txt to download the required resource files.

  2. Upload the spark331_pandas153.pex, kmeans.py, and kmeans_data.txt files to OSS. For more information, see Simple upload.

    Note

    In this example, Spark 3.3.1 is used and the PEX file contains the Pandas, PyArrow, and NumPy third-party Python libraries. You can package PySpark environments of other versions based on the Spark version that you select. For more information about engine versions, see Engine versions.

Step 3: Develop and run a job

  1. In the navigation pane on the left of the EMR Serverless Spark page, click Development.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, select Application(Batch) > PySpark from the Type drop-down list, and then click OK.

  4. In the upper-right corner, select a queue.

  5. On the configuration tab of the job, configure the following parameters and click Run. You do not need to configure other parameters.

    Parameter

    Description

    Main Python Resources

    Select OSS and enter the OSS path of the kmeans.py file. Example: oss://<yourBucketName>/kmeans.py.

    Execution Parameters

    Enter the OSS path of the kmeans_data.txt data file.

    Format: oss://<yourBucketName>/kmeans_data.txt 2.

    File Resources

    Select OSS and enter the OSS path of the spark331_pandas153.pex file. Example: oss://<yourBucketName>/spark331_pandas153.pex.

    Spark Configuration

    spark.pyspark.driver.python            ./spark331_pandas153.pex
    spark.pyspark.python                   ./spark331_pandas153.pex
  6. After the job is run, click Logs in the Actions column of the job in the Execution Records section.

  7. On the Log Exploration tab, you can view the related logs.

    For example, on the Stdout tab of the Driver Log section, you can view the following information:

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

References

This topic uses PySpark development as an example. If you want to develop jobs in other ways, see Develop a batch or streaming job.