All Products
Search
Document Center

E-MapReduce:Use third-party Python libraries in a PySpark job

Last Updated:Mar 26, 2026

PySpark jobs running on EMR Serverless Spark execute across distributed nodes, so every Python library your code imports must be available on each node — not just on your local machine. Without explicit dependency management, tasks fail with errors like ModuleNotFoundError: No module named 'pandas'.

EMR Serverless Spark supports three methods to distribute Python dependencies. Choose based on your environment and workflow:

Method Best for
Runtime environments Reusable, console-managed environments shared across multiple jobs. The system builds and maintains the environment automatically.
Conda Jobs that need a specific Python version or libraries with complex native dependencies. Requires building the environment on a compatible ECS instance.
PEX Lightweight, self-contained packaging of pure-Python dependencies into a single executable file.

Prerequisites

Before you begin, ensure that you have:

  • A workspace. See Create a workspace.

  • Python 3.8 or later installed. The examples in this topic use Python 3.8.

Method 1: Use runtime environments

Runtime environments let you define a set of PyPI libraries once in the console. EMR Serverless Spark builds and manages the environment automatically, and you can attach it to any job without re-packaging dependencies.

Step 1: Create a runtime environment

  1. Log on to the E-MapReduce console.

  2. In the navigation pane, choose EMR Serverless > Spark.

  3. Click the name of your workspace.

  4. In the navigation pane, click Environment.

  5. Click Create Environment.

  6. On the Create Environment page, click Add Library. For parameter details, see Manage runtime environments.

  7. In the New Library dialog box, set the source type to PyPI, enter the library name and version in the PyPI Package field, and click OK. If you omit the version, the latest version is installed. This example adds two libraries: faker and geopy.

  8. Click Create. The system initializes the environment after creation.

Step 2: Upload the script to OSS

  1. Click pyspark_third_party_libs_demo.py to download the sample script. The script uses Faker to generate 100 synthetic user records with random coordinates near Paris, then uses geopy to calculate the geodesic distance from each user to the Eiffel Tower (48.8584, 2.2945), and filters for users within 10 kilometers. Alternatively, create pyspark_third_party_libs_demo.py with the following content:

    pyspark_third_party_libs_demo.py

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    from faker import Faker
    import random
    from geopy.distance import geodesic
    
    spark = SparkSession.builder \
            .appName("PySparkThirdPartyLibsDemo") \
            .getOrCreate()
    
    # Use the Faker library to generate synthetic data
    fake = Faker()
    landmark = (48.8584, 2.2945)  # Eiffel Tower coordinates
    
    def generate_fake_data(num_records):
        data = []
        for _ in range(num_records):
            # Generate random coordinates near Paris
            lat = 48.85 + random.uniform(-0.2, 0.2)
            lon = 2.30 + random.uniform(-0.2, 0.2)
            data.append((
                fake.uuid4(),        # User ID
                fake.name(),         # Name
                lat,                 # Latitude
                lon                  # Longitude
            ))
        return data
    
    # Generate 100 synthetic records
    fake_data = generate_fake_data(100)
    
    # Create a Spark DataFrame
    columns = ["user_id", "name", "latitude", "longitude"]
    df = spark.createDataFrame(fake_data, schema=columns)
    
    print("Generated sample data:")
    df.show(5)
    
    # Use geopy to calculate distance from each user to the landmark
    def calculate_distance(lat, lon, landmark=landmark):
        """Calculate the geodesic distance between two points (in kilometers)."""
        user_location = (lat, lon)
        return geodesic(user_location, landmark).kilometers
    
    # Register a UDF (User Defined Function)
    distance_udf = udf(calculate_distance, FloatType())
    
    # Add a distance column
    df_with_distance = df.withColumn(
        "distance_km",
        distance_udf("latitude", "longitude")
    )
    
    # Filter users within 10 kilometers
    nearby_users = df_with_distance.filter("distance_km <= 10")
    
    print(f"\nFound {nearby_users.count()} users within a 10-kilometer range:")
    nearby_users.select("name", "latitude", "longitude", "distance_km").show(10)
  2. Upload pyspark_third_party_libs_demo.py to OSS. See Simple upload.

Step 3: Run the job

  1. In the navigation pane of the EMR Serverless Spark page, click Development.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, select Application(Batch) > PySpark from the Type drop-down list, and click OK.

  4. In the upper-right corner, select a queue.

  5. On the job configuration tab, set the following parameters and click Run.

    Parameter Value
    Main Python Resources Select OSS and enter the OSS path of pyspark_third_party_libs_demo.py. Example: oss://<yourBucketName>/pyspark_third_party_libs_demo.py
    Environment Select the runtime environment you created.
  6. After the job finishes, click Logs in the Actions column under Execution Records.

  7. On the Log Exploration tab, open the Stdout tab under Driver Log to view the output. Expected output:

    Generated sample data:
    +--------------------+-------------------+------------------+------------------+
    |             user_id|               name|          latitude|         longitude|
    +--------------------+-------------------+------------------+------------------+
    |73d4565c-8cdf-4bc...|  Garrett Robertson| 48.81845614776422|2.4087517234236064|
    |0fc364b1-6759-416...|      Dawn Gonzalez| 48.68654896170054|2.4708555780468013|
    |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246|
    |1cabbdde-e703-4a8...|       David Morris|48.656356532418116|2.2503952330408175|
    |8b7938a0-b283-401...|    Shannon Perkins| 48.82915001905855| 2.410743969589327|
    +--------------------+-------------------+------------------+------------------+
    only showing top 5 rows
    
    
    Found 24 users within a 10-kilometer range:
    +-----------------+------------------+------------------+-----------+
    |             name|          latitude|         longitude|distance_km|
    +-----------------+------------------+------------------+-----------+
    |Garrett Robertson| 48.81845614776422|2.4087517234236064|   9.490705|
    |  Shannon Perkins| 48.82915001905855| 2.410743969589327|   9.131355|
    |      Alex Harris| 48.82547383207313|2.3579336032430027|   5.923493|
    |      Tammy Ramos| 48.84668267431606|2.3606455536493574|   5.026109|
    |   Ivan Christian| 48.89224239228342|2.2811025348668195|  3.8897192|
    |  Vernon Humphrey| 48.93142188723839| 2.306957802222233|   8.171813|
    |  Shawn Rodriguez|48.919907710882654|2.2270993307836044|   8.439087|
    |    Robert Fisher|48.794216103154646|2.3699024070507906|   9.033209|
    |  Heather Collier|48.822957591865205|2.2993033803043454|   3.957171|
    |       Dawn White|48.877816307255586|2.3743880390928878|   6.246059|
    +-----------------+------------------+------------------+-----------+
    only showing top 10 rows

Method 2: Manage dependencies with Conda

Conda lets you create a reproducible Python environment with a specific Python version and library set, package it as a tarball, and deploy it to each Spark node via the Archive Resources parameter.

Conda and PEX environments must be built on an x86 ECS instance running Alibaba Cloud Linux 3.

Step 1: Build and package the Conda environment

  1. Create an Elastic Compute Service (ECS) instance with the following configuration. See Create an instance on the Custom Launch tab.

    • OS: Alibaba Cloud Linux 3

    • Architecture: x86

    • Internet access: enabled

    You can also use an idle node from an existing EMR cluster created on the EMR on ECS page, as long as it has an x86 architecture.
  2. Install Miniconda on the instance:

    wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
    chmod +x Miniconda3-latest-Linux-x86_64.sh
    ./Miniconda3-latest-Linux-x86_64.sh -b
    source miniconda3/bin/activate
  3. Create and package a Conda environment with Python 3.8 and NumPy:

    conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz

    This produces pyspark_conda_env.tar.gz, which contains the full Python environment.

kmeans.py

  1. Download the sample files: Alternatively, create them with the following content: kmeans.py:

    """
    A K-means clustering program using MLlib.
    
    This example requires NumPy (http://www.numpy.org/).
    """
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    
    
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kmeans <file> <k>", file=sys.stderr)
            sys.exit(-1)
        sc = SparkContext(appName="KMeans")
        lines = sc.textFile(sys.argv[1])
        data = lines.map(parseVector)
        k = int(sys.argv[2])
        model = KMeans.train(data, k)
        print("Final centers: " + str(model.clusterCenters))
        print("Total Cost: " + str(model.computeCost(data)))
        sc.stop()
  2. Upload pyspark_conda_env.tar.gz, kmeans.py, and kmeans_data.txt to OSS. See Simple upload.

kmeans_data.txt

0.0 0.0 0.0
0.1 0.1 0.1
0.2 0.2 0.2
9.0 9.0 9.0
9.1 9.1 9.1
9.2 9.2 9.2

Step 3: Run the job

  1. In the navigation pane of the EMR Serverless Spark page, click Development.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, select Application(Batch) > PySpark from the Type drop-down list, and click OK.

  4. In the upper-right corner, select a queue.

  5. On the job configuration tab, set the following parameters and click Run.

    Parameter Value
    Main Python Resources Select OSS and enter the OSS path of kmeans.py. Example: oss://<yourBucketName>/kmeans.py
    Execution Parameters Enter the OSS path of kmeans_data.txt followed by the number of clusters. Format: oss://<yourBucketName>/kmeans_data.txt 2
    Archive Resources Select OSS and enter the OSS path of pyspark_conda_env.tar.gz. Format: oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv
    Spark Configuration spark.pyspark.driver.python ./condaenv/bin/python<br>spark.pyspark.python ./condaenv/bin/python
  6. After the job finishes, click Logs in the Actions column under Execution Records.

  7. On the Log Exploration tab, open the Stdout tab under Driver Log to view the output. Expected output:

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

Method 3: Package dependencies with PEX

PEX (Python EXecutable) packages Python libraries into a single self-contained file, which Spark deploys to each node as a file resource.

Match the PEX package versions to your Spark engine version — see Engine versions for the Spark version your workspace uses.

Step 1: Build the PEX file

  1. Create an ECS instance with the following configuration. See Create an instance on the Custom Launch tab.

    • OS: Alibaba Cloud Linux 3

    • Architecture: x86

    • Internet access: enabled

    You can also use an idle node from an existing EMR cluster created on the EMR on ECS page, as long as it has an x86 architecture.
  2. Install the PEX and wheel tools:

    pip3.8 install --user pex wheel \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  3. Download wheel files for the target libraries into a local directory:

    pip3.8 wheel -w /tmp/wheel \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  4. Bundle the wheel files into a PEX file:

    pex -f /tmp/wheel --no-index \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      -o spark331_pandas153.pex

    This example targets Spark 3.3.1 and bundles pandas, PyArrow, and NumPy. Adjust the versions to match your Spark engine.

Step 2: Upload resource files to OSS

  1. Download the sample files:

  2. Upload spark331_pandas153.pex, kmeans.py, and kmeans_data.txt to OSS. See Simple upload.

Step 3: Run the job

  1. In the navigation pane of the EMR Serverless Spark page, click Development.

  2. On the Development tab, click the image icon.

  3. In the New dialog box, enter a name, select Application(Batch) > PySpark from the Type drop-down list, and click OK.

  4. In the upper-right corner, select a queue.

  5. On the job configuration tab, set the following parameters and click Run.

    Parameter Value
    Main Python Resources Select OSS and enter the OSS path of kmeans.py. Example: oss://<yourBucketName>/kmeans.py
    Execution Parameters Enter the OSS path of kmeans_data.txt followed by the number of clusters. Format: oss://<yourBucketName>/kmeans_data.txt 2
    File Resources Select OSS and enter the OSS path of spark331_pandas153.pex. Example: oss://<yourBucketName>/spark331_pandas153.pex
    Spark Configuration spark.pyspark.driver.python ./spark331_pandas153.pex<br>spark.pyspark.python ./spark331_pandas153.pex
  6. After the job finishes, click Logs in the Actions column under Execution Records.

  7. On the Log Exploration tab, open the Stdout tab under Driver Log to view the output. Expected output:

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

What's next

This topic uses PySpark batch jobs as examples. To develop other job types, see Develop a batch or streaming job.