PySpark可直接調用Python的API運行Spark作業,PySpark作業需在特定Python環境中運行。EMR預設支援使用Python,若EMR支援的Python版本無法運行PySpark作業,則您可參考本實踐配置可用的Python環境並在DataWorks上運行PySpark作業。
前提條件
執行本實踐所使用的DataWorks及E-MapReduce(簡稱EMR)需部署在相同地區。產品各自需執行的前提條件如下:
DataWorks側
在DataWorks運行PySpark作業時,需建立EMR Spark節點,並使用
spark-submit命令提交作業。EMR側
需準備如下EMR環境:
準備EMR執行個體。本實踐樣本使用EMR on ECS執行個體。
本實踐需使用一個Python包進行樣本驗證,您可在本地或ECS進行自主打包;也可直接下載本實踐的樣本包(Python3.7)。使用自主打包時,本地或ECS需安裝Docker運行環境及Python運行環境。
說明本實踐僅以Python3.7示範相關操作,實際使用中可選擇所需Python版本。EMR支援的Python版本可能和您使用的Python版本存在差異,建議使用本實踐的Python3.7版本。
操作步驟
準備運行Python程式需要的虛擬環境。
您可選擇直接下載本實踐的樣本包python3.7使用(推薦);或通過如下步驟自主打包Python環境。
製作Docker鏡像。
您可選擇直接下載本實踐的樣本Dockerfile檔案至本地或ECS;或在安裝了Docker環境的宿主機上建立一個Dockerfile檔案。Dockerfile檔案的內容如下。
FROM centos:centos7.9.2009 RUN set -ex \ # 預先安裝所需組件。 && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\ && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \ && tar -zxvf Python-3.7.0.tgz \ && cd Python-3.7.0 \ && ./configure prefix=/usr/local/python3 \ && make \ && make install \ && make clean \ && rm -rf /Python-3.7.0* \ && yum install -y epel-release \ && yum install -y python-pip # 設定預設為python3。 RUN set -ex \ # 備份舊版本python。 && mv /usr/bin/python /usr/bin/python27 \ && mv /usr/bin/pip /usr/bin/pip-python27 \ # 配置預設為python3。 && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \ && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip # 修複因修改python版本導致yum失效問題。 RUN set -ex \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \ && yum install -y deltarpm # 更新pip版本。 RUN pip install --upgrade pip構建鏡像並運行容器。
在Dockerfile檔案所在路徑下,執行如下命令。
sudo docker build -t python-centos:3.7 . sudo docker run -itd --name python3.7 python-centos:3.7進入安裝容器所需的Python依賴庫並打包Python環境。
sudo docker exec -it python3.7 bash pip install [所需依賴庫] # vi requirements.txt # pip install -r requirements.txt # numpy # pandas cd /usr/local/ zip -r python3.7.zip python3/拷貝容器中的Python環境到宿主機。
# 在宿主機運行命令將虛擬環境拷貝到宿主機。 sudo docker cp python3.7:/usr/local/python3.7.zip .
上傳虛擬環境。
您可根據需要,選擇上傳Python虛擬環境至OSS或HDFS。
說明本實踐以上傳至HDFS樣本。如果您選擇上傳至OSS,操作詳情請參見上傳檔案。
上傳Python環境至HDFS命令如下。
# 上傳至HDFS中。 hdfs dfs -copyFromLocal python3.7.zip /tmp/pyspark測試並上傳Python代碼。
您可在本地或ECS中建立一個
py檔案,按照下述方法測試Python代碼是否正確。本實踐樣本使用pyspark_test.py檔案測試。# -*- coding: utf-8 -*- import os from pyspark.sql import SparkSession def noop(x): import socket import sys host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ) print('host: ' + host) print('PYTHONPATH: ' + os.environ['PYTHONPATH']) print('PWD: ' + os.environ['PWD']) print(os.listdir('.')) return host if __name__ == '__main__': spark = SparkSession \ .builder \ .appName("test_pyspark") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext # 驗證系統當前環境變數。 rdd = sc.parallelize(range(10), 2) hosts = rdd.map(noop).distinct().collect() print(hosts) # 驗證UDF。 # https://docs.databricks.com/spark/latest/spark-sql/udf-python.html# # spark.udf.register("udf_squared", udf_squared) # spark.udf.register("udf_numpy", udf_numpy) tableName = "store" df = spark.sql("""select count(*) from %s """ % tableName) print("rdf count, %s\n" % df.count()) df.show()說明您需將樣本表名
store替換成資料倉儲中實際存在的表名。上傳Python代碼至HDFS中。
參考如下命令,在EMR執行個體中上傳Python代碼至HDFS。
說明本實踐以上傳至HDFS樣本。如果您選擇上傳至OSS,操作詳情請參見上傳檔案。
hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
在DataWorks中通過
spark-submit命令提交作業。在建立的EMR Spark節點中,使用如下命令提交作業。
說明如果您選擇上傳Python代碼至OSS,則需替換為實際使用的OSS路徑。
spark-submit --master yarn \ --deploy-mode cluster \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \ --conf spark.executorEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \ --archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \ ## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \ --driver-memory 4g \ --driver-cores 1 \ --executor-memory 4g \ --executor-cores 1 \ --num-executors 3 \ --name TestPySpark \ hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py