全部產品
Search
文件中心

DataWorks:在DataWorks上運行PySpark作業的最佳實務

更新時間:Mar 13, 2025

PySpark可直接調用Python的API運行Spark作業,PySpark作業需在特定Python環境中運行。EMR預設支援使用Python,若EMR支援的Python版本無法運行PySpark作業,則您可參考本實踐配置可用的Python環境並在DataWorks上運行PySpark作業。

前提條件

執行本實踐所使用的DataWorks及E-MapReduce(簡稱EMR)需部署在相同地區。產品各自需執行的前提條件如下:

  • DataWorks側

    在DataWorks運行PySpark作業時,需建立EMR Spark節點,並使用spark-submit命令提交作業。

  • EMR側

    需準備如下EMR環境:

    • 準備EMR執行個體。本實踐樣本使用EMR on ECS執行個體。

    • 本實踐需使用一個Python包進行樣本驗證,您可在本地或ECS進行自主打包;也可直接下載本實踐的樣本包(Python3.7)。使用自主打包時,本地或ECS需安裝Docker運行環境Python運行環境

      說明

      本實踐僅以Python3.7示範相關操作,實際使用中可選擇所需Python版本。EMR支援的Python版本可能和您使用的Python版本存在差異,建議使用本實踐的Python3.7版本。

操作步驟

  1. 準備運行Python程式需要的虛擬環境。

    您可選擇直接下載本實踐的樣本包python3.7使用(推薦);或通過如下步驟自主打包Python環境。

    1. 製作Docker鏡像。

      您可選擇直接下載本實踐的樣本Dockerfile檔案至本地或ECS;或在安裝了Docker環境的宿主機上建立一個Dockerfile檔案。Dockerfile檔案的內容如下。

      FROM centos:centos7.9.2009
      RUN set -ex \
      # 預先安裝所需組件。
      && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\
      && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \
      && tar -zxvf Python-3.7.0.tgz \
      && cd Python-3.7.0 \
      && ./configure prefix=/usr/local/python3 \
      && make \
      && make install \
      && make clean \
      && rm -rf /Python-3.7.0* \
      && yum install -y epel-release \
      && yum install -y python-pip
      # 設定預設為python3。
      RUN set -ex \
      # 備份舊版本python。
      && mv /usr/bin/python /usr/bin/python27 \
      && mv /usr/bin/pip /usr/bin/pip-python27 \
      # 配置預設為python3。
      && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \
      && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
      # 修複因修改python版本導致yum失效問題。
      RUN set -ex \
      && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \
      && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \
      && yum install -y deltarpm
      # 更新pip版本。
      RUN pip install --upgrade pip
    2. 構建鏡像並運行容器。

      在Dockerfile檔案所在路徑下,執行如下命令。

      sudo docker build -t python-centos:3.7 .
      sudo docker run -itd --name python3.7 python-centos:3.7
    3. 進入安裝容器所需的Python依賴庫並打包Python環境。

      sudo docker exec -it  python3.7 bash
      
      pip install [所需依賴庫]
      # vi requirements.txt
      # pip install -r requirements.txt
      # numpy
      # pandas
      
      cd /usr/local/
      zip -r python3.7.zip python3/
    4. 拷貝容器中的Python環境到宿主機。

      # 在宿主機運行命令將虛擬環境拷貝到宿主機。
      sudo docker cp python3.7:/usr/local/python3.7.zip .                         
  2. 上傳虛擬環境。

    您可根據需要,選擇上傳Python虛擬環境至OSS或HDFS。

    說明

    本實踐以上傳至HDFS樣本。如果您選擇上傳至OSS,操作詳情請參見上傳檔案

    上傳Python環境至HDFS命令如下。

    # 上傳至HDFS中。
    hdfs dfs -copyFromLocal python3.7.zip /tmp/pyspark                          
  3. 測試並上傳Python代碼。

    1. 您可在本地或ECS中建立一個py檔案,按照下述方法測試Python代碼是否正確。本實踐樣本使用pyspark_test.py檔案測試。

      # -*- coding: utf-8 -*-
      
      import os
      from pyspark.sql import SparkSession
      
      def noop(x):
          import socket
          import sys
          host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)
          print('host: ' + host)
          print('PYTHONPATH: ' + os.environ['PYTHONPATH'])
          print('PWD: ' + os.environ['PWD'])
          print(os.listdir('.'))
          return host
      
      
      if __name__ == '__main__':
      
          spark = SparkSession \
              .builder \
              .appName("test_pyspark") \
              .enableHiveSupport() \
              .getOrCreate()
      
          sc = spark.sparkContext
          # 驗證系統當前環境變數。
          rdd = sc.parallelize(range(10), 2)
          hosts = rdd.map(noop).distinct().collect()
          print(hosts)
      
          # 驗證UDF。
          # https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#
          # spark.udf.register("udf_squared", udf_squared)
          # spark.udf.register("udf_numpy", udf_numpy)
      
          tableName = "store"
          df = spark.sql("""select count(*) from %s """ % tableName)
          print("rdf count, %s\n" % df.count())
          df.show()
      說明

      您需將樣本表名store替換成資料倉儲中實際存在的表名。

    2. 上傳Python代碼至HDFS中。

      參考如下命令,在EMR執行個體中上傳Python代碼至HDFS。

      說明

      本實踐以上傳至HDFS樣本。如果您選擇上傳至OSS,操作詳情請參見上傳檔案

      hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
  4. 在DataWorks中通過spark-submit命令提交作業。

    在建立的EMR Spark節點中,使用如下命令提交作業。

    說明

    如果您選擇上傳Python代碼至OSS,則需替換為實際使用的OSS路徑。

    spark-submit --master yarn \
    --deploy-mode cluster \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \
    --conf spark.executorEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \
    --conf spark.yarn.appMasterEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \
    --conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \
    --archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \
    ## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \
    --driver-memory 4g \
    --driver-cores 1 \
    --executor-memory 4g \
    --executor-cores 1 \
    --num-executors 3 \
    --name TestPySpark \
    hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py