全部產品
Search
文件中心

E-MapReduce:在PySpark程式中使用Python第三方庫

更新時間:Jul 23, 2025

PySpark任務往往需要藉助Python第三方庫來增強資料處理和分析能力。本文通過樣本詳細介紹了如何通過運行環境、Conda環境隔離與PEX輕量化打包方式,有效地將這些庫整合到Serverless Spark環境中,確保任務在分散式運算情境下的穩定性和靈活性。

背景資訊

在互動式PySpark開發過程中,可以使用Python第三方庫以提升資料處理與分析的靈活性及易用性。以下三種方式均能協助您實現這一目標,建議根據實際情況選擇最適合的方式。

方式

適用情境

方式一:通過運行環境使用Python第三方庫

在阿里雲控制台配置包含所需庫的標準化環境(如 numpypandas),系統會自動構建環境,新增任務時使用建立的運行環境即可。

方式二:通過Conda管理Python環境

Conda是一個跨平台的包管理和環境管理系統,它允許使用者輕鬆建立、儲存、載入和切換多個環境,每個環境都可以擁有獨立的Python版本和庫依賴。

方式三:通過PEX打包Python依賴

PEX (Python EXecutable) 是一個工具,它可以將Python應用及其所有依賴打包進一個可執行檔中。

前提條件

已建立工作空間,詳情請參見建立工作空間

使用限制

已安裝Python 3.8及以上版本。本文以Python 3.8為例介紹。

操作流程

方式一:通過運行環境使用Python第三方庫

步驟一:建立運行環境

  1. 進入運行環境管理頁面。

    1. 登入E-MapReduce控制台

    2. 在左側導覽列,選擇EMR Serverless > Spark

    3. Spark頁面,單擊目標工作空間名稱。

    4. EMR Serverless Spark頁面,選擇左側導覽列中的運行環境管理

  2. 單擊建立運行環境

  3. 建立運行環境頁面,單擊添加庫

    更多參數資訊,請參見管理運行環境

  4. 建立庫中,使用PyPI來源類型,配置PyPI Package參數,然後單擊確定

    PyPI Package中填寫庫的名稱及版本,不指定版本時,預設安裝最新版本。

    本文樣本添加的庫為fakergeopy

  5. 單擊建立

    建立後將開始初始化環境。

步驟二:上傳資源檔至OSS

  1. 單擊pyspark_third_party_libs_demo.py,下載所需資源檔。

    本文樣本展示了如何使用PySpark和第三方庫產生類比資料並進行地理分析。其中,faker庫用於產生包含使用者資訊和隨機地理位置的類比資料,geopy庫用於計算每個使用者位置與埃菲爾鐵塔之間的地理距離,最終篩選出距離在10公裡範圍內的使用者。

    您也可以建立樣本指令碼pyspark_third_party_libs_demo.py,內容如下所示。

    pyspark_third_party_libs_demo.py

    from pyspark.sql import SparkSession   
    from pyspark.sql.functions import udf
    from pyspark.sql.types import FloatType
    from faker import Faker
    import random
    from geopy.distance import geodesic
    
    spark = SparkSession.builder \
            .appName("PySparkThirdPartyLibsDemo") \
            .getOrCreate()
    
    # 使用第三方庫faker產生類比資料
    fake = Faker()
    landmark = (48.8584, 2.2945)  # 埃菲爾鐵塔座標
    
    # 建立類比資料函數
    def generate_fake_data(num_records):
        data = []
        for _ in range(num_records):
            # 在巴黎附近產生隨機座標
            lat = 48.85 + random.uniform(-0.2, 0.2)
            lon = 2.30 + random.uniform(-0.2, 0.2)
            data.append((
                fake.uuid4(),        # 使用者ID
                fake.name(),         # 姓名
                lat,                 # 緯度
                lon                  # 經度
            ))
        return data
    
    # 產生100條類比記錄
    fake_data = generate_fake_data(100)
    
    # 建立Spark DataFrame
    columns = ["user_id", "name", "latitude", "longitude"]
    df = spark.createDataFrame(fake_data, schema=columns)
    
    # 列印產生的樣本資料
    print("產生的樣本資料:")
    df.show(5)
    
    # 使用第三方庫geopy計算距離
    def calculate_distance(lat, lon, landmark=landmark):
        """計算兩點之間的地理距離(公裡)"""
        user_location = (lat, lon)
        return geodesic(user_location, landmark).kilometers
    
    # 註冊UDF(使用者定義函數)
    distance_udf = udf(calculate_distance, FloatType())
    
    # 添加距離列
    df_with_distance = df.withColumn(
        "distance_km", 
        distance_udf("latitude", "longitude")
    )
    
    # 找出10公裡範圍內的使用者
    nearby_users = df_with_distance.filter("distance_km <= 10")
    
    # 列印結果
    print(f"\n找到 {nearby_users.count()} 個在10公裡範圍內的使用者:")
    nearby_users.select("name", "latitude", "longitude", "distance_km").show(10)
    
  2. 上傳pyspark_third_party_libs_demo.py至OSS,上傳操作可以參見簡單上傳

步驟三:開發並運行任務

  1. 在EMR Serverless Spark頁面,單擊左側的資料開發

  2. 開發目錄頁簽下,單擊image表徵圖。

  3. 建立對話方塊中,輸入名稱,類型選擇批任務 > PySpark,單擊確定

  4. 在右上方選擇隊列。

  5. 在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊運行

    參數

    說明

    主Python資源

    選擇OSS資源,填寫您上傳pyspark_third_party_libs_demo.py至OSS的路徑。例如,oss://<yourBucketName>/pyspark_third_party_libs_demo.py。

    運行環境

    在下拉框中選擇您建立的運行環境。

  6. 運行任務後,在下方的運行記錄地區,單擊任務操作列的日誌探查

  7. 日誌探查頁簽,您可以查看相關的日誌資訊。

    例如,在Driver日誌Stdout頁簽,可以查看到返回以下資訊。

    產生的樣本資料:
    +--------------------+-------------------+------------------+------------------+
    |             user_id|               name|          latitude|         longitude|
    +--------------------+-------------------+------------------+------------------+
    |73d4565c-8cdf-4bc...|  Garrett Robertson| 48.81845614776422|2.4087517234236064|
    |0fc364b1-6759-416...|      Dawn Gonzalez| 48.68654896170054|2.4708555780468013|
    |2ab1f0aa-5552-4e1...|Alexander Gallagher| 48.87603770688707|2.1209399987431246|
    |1cabbdde-e703-4a8...|       David Morris|48.656356532418116|2.2503952330408175|
    |8b7938a0-b283-401...|    Shannon Perkins| 48.82915001905855| 2.410743969589327|
    +--------------------+-------------------+------------------+------------------+
    only showing top 5 rows
    
    
    找到 24 個在10公裡範圍內的使用者:
    +-----------------+------------------+------------------+-----------+
    |             name|          latitude|         longitude|distance_km|
    +-----------------+------------------+------------------+-----------+
    |Garrett Robertson| 48.81845614776422|2.4087517234236064|   9.490705|
    |  Shannon Perkins| 48.82915001905855| 2.410743969589327|   9.131355|
    |      Alex Harris| 48.82547383207313|2.3579336032430027|   5.923493|
    |      Tammy Ramos| 48.84668267431606|2.3606455536493574|   5.026109|
    |   Ivan Christian| 48.89224239228342|2.2811025348668195|  3.8897192|
    |  Vernon Humphrey| 48.93142188723839| 2.306957802222233|   8.171813|
    |  Shawn Rodriguez|48.919907710882654|2.2270993307836044|   8.439087|
    |    Robert Fisher|48.794216103154646|2.3699024070507906|   9.033209|
    |  Heather Collier|48.822957591865205|2.2993033803043454|   3.957171|
    |       Dawn White|48.877816307255586|2.3743880390928878|   6.246059|
    +-----------------+------------------+------------------+-----------+
    only showing top 10 rows

方式二:通過Conda管理Python環境

步驟一:Conda環境構建與部署

  1. 建立一台使用Alibaba Cloud Linux 3系統,且開啟公網的ECS執行個體(需為x86架構),詳情請參見自訂購買執行個體

    說明

    如果您在EMR on ECS頁面已有的EMR叢集中有空閑節點,也可以直接利用這些空閑節點(需確保節點為x86架構)。

  2. 通過以下命令安裝Miniconda。

    wget https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh
    chmod +x Miniconda3-latest-Linux-x86_64.sh
    
    ./Miniconda3-latest-Linux-x86_64.sh -b
    source miniconda3/bin/activate
  3. 構建使用Python 3.8和numpy的Conda環境。

    conda create -y -n pyspark_conda_env -c conda-forge conda-pack numpy python=3.8
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz

步驟二:上傳資源檔至OSS

  1. 單擊kmeans.pykmeans_data.txt,下載所需資源檔。

    您也可以建立樣本指令碼kmeans.py和資料檔案kmeans_data.txt,內容如下所示。

    kmeans.py

    """
    A K-means clustering program using MLlib.
    
    This example requires NumPy (http://www.numpy.org/).
    """
    import sys
    
    import numpy as np
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    
    
    def parseVector(line):
        return np.array([float(x) for x in line.split(' ')])
    
    
    if __name__ == "__main__":
        if len(sys.argv) != 3:
            print("Usage: kmeans <file> <k>", file=sys.stderr)
            sys.exit(-1)
        sc = SparkContext(appName="KMeans")
        lines = sc.textFile(sys.argv[1])
        data = lines.map(parseVector)
        k = int(sys.argv[2])
        model = KMeans.train(data, k)
        print("Final centers: " + str(model.clusterCenters))
        print("Total Cost: " + str(model.computeCost(data)))
        sc.stop()

    kmeans_data.txt

    0.0 0.0 0.0
    0.1 0.1 0.1
    0.2 0.2 0.2
    9.0 9.0 9.0
    9.1 9.1 9.1
    9.2 9.2 9.2
  2. 上傳pyspark_conda_env.tar.gzkmeans.pykmeans_data.txt至OSS,上傳操作可以參見簡單上傳

步驟三:開發並運行任務

  1. 在EMR Serverless Spark頁面,單擊左側的資料開發

  2. 開發目錄頁簽下,單擊image表徵圖。

  3. 建立對話方塊中,輸入名稱,類型選擇批任務 > PySpark,單擊確定

  4. 在右上方選擇隊列。

  5. 在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊運行

    參數

    說明

    主Python資源

    選擇OSS資源,填寫您上傳kmeans.py至OSS的路徑。例如,oss://<yourBucketName>/kmeans.py。

    運行參數

    填寫資料檔案kmeans_data.txt上傳到OSS的路徑。

    填寫格式為oss://<yourBucketName>/kmeans_data.txt 2

    archives資源

    選擇OSS資源,填寫您上傳pyspark_conda_env.tar.gz至OSS的路徑。

    填寫格式為:oss://<yourBucketName>/pyspark_conda_env.tar.gz#condaenv

    Spark配置

    spark.pyspark.driver.python  ./condaenv/bin/python
    spark.pyspark.python         ./condaenv/bin/python
  6. 運行任務後,在下方的運行記錄地區,單擊任務操作列的日誌探查

  7. 日誌探查頁簽,您可以查看相關的日誌資訊。

    例如,在Driver日誌Stdout頁簽,可以查看到返回以下資訊。

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

方式三:通過PEX打包Python依賴

步驟一:PEX檔案打包與執行

  1. 建立一台使用Alibaba Cloud Linux 3系統,且開啟公網的ECS執行個體(需為x86架構),詳情請參見自訂購買執行個體

    說明

    如果您在EMR on ECS頁面已有的EMR叢集中有空閑節點,也可以直接利用這些空閑節點(需確保節點為x86架構)。

  2. 安裝PEX與wheel工具。

    pip3.8 install --user pex wheel \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  3. 下載所需庫的wheel檔案至臨時目錄。

    pip3.8 wheel -w /tmp/wheel \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      --trusted-host mirrors.cloud.aliyuncs.com \
      -i http://mirrors.cloud.aliyuncs.com/pypi/simple/
  4. 產生PEX檔案。

    pex -f /tmp/wheel --no-index \
      pyspark==3.3.1 pandas==1.5.3 pyarrow==15.0.1 numpy==1.24.4 \
      -o spark331_pandas153.pex

步驟二:上傳PEX檔案至OSS

  1. 單擊kmeans.pykmeans_data.txt,下載所需資源檔。

  2. 上傳spark331_pandas153.pexkmeans.pykmeans_data.txt至OSS,上傳操作可以參見簡單上傳

    說明

    本文樣本使用的Spark版本是3.3.1,同時包含pandas、pyarrow以及numpy等第三方Python庫。您也可以根據選擇的引擎版本打包其他版本的PySpark環境。有關引擎版本的詳細資料,請參見引擎版本介紹

步驟三:開發並運行任務

  1. 在EMR Serverless Spark頁面,單擊左側的資料開發

  2. 開發目錄頁簽下,單擊image表徵圖。

  3. 建立對話方塊中,輸入名稱,類型選擇批任務 > PySpark,單擊確定

  4. 在右上方選擇隊列。

  5. 在建立的開發頁簽中,配置以下資訊,其餘參數無需配置,然後單擊運行

    參數

    說明

    主Python資源

    選擇OSS資源,填寫您上傳kmeans.py至OSS的路徑。例如,oss://<yourBucketName>/kmeans.py。

    運行參數

    填寫資料檔案kmeans_data.txt上傳到OSS的路徑。

    填寫格式為oss://<yourBucketName>/kmeans_data.txt 2

    files資源

    選擇OSS資源,填寫您上傳spark331_pandas153.pex至OSS的路徑。例如,oss://<yourBucketName>/spark331_pandas153.pex

    Spark配置

    spark.pyspark.driver.python            ./spark331_pandas153.pex
    spark.pyspark.python                   ./spark331_pandas153.pex
  6. 運行任務後,在下方的運行記錄地區,單擊任務操作列的日誌探查

  7. 日誌探查頁簽,您可以查看相關的日誌資訊。

    例如,在Driver日誌Stdout頁簽,可以查看到返回以下資訊。

    Final centers: [array([0.1, 0.1, 0.1]), array([9.1, 9.1, 9.1])]
    Total Cost: 0.11999999999999958

相關文檔

本文以PySpark開發為例,如果您想通過其他方式進行開發,可以參見批任務或流任務開發