PySpark calls Python API operations to run Spark jobs. PySpark jobs must be run in a specific Python environment. By default, E-MapReduce (EMR) supports Python. If the Python version supported by EMR cannot be used to run PySpark jobs, you can refer to this topic to configure a Python environment that meets specific requirements and run PySpark jobs in DataWorks.
Prerequisites
In this topic, the DataWorks workspace and EMR cluster that are used reside in the same region. This section describes the prerequisites that must be met at the DataWorks and EMR sides:
DataWorks side
An EMR Spark node is created to run PySpark jobs in DataWorks, and the
spark-submitcommand is run to submit PySpark jobs.EMR side
An EMR environment that includes the following configurations is prepared:
An EMR cluster. In this example, an EMR on ECS cluster is used.
A Python package used for sample verification. You can package a Python environment on your on-premises machine or Elastic Compute Service (ECS) instance. Alternatively, you can directly download the sample Python package Python 3.7 that is used in this topic. To package a Python environment, make sure that the Docker runtime environment and the Python runtime environment are installed on your on-premises machine or ECS instance.
NoteIn this topic, Python 3.7 is used only for reference. You can select a Python version based on your business requirements. The Python version supported by EMR may be different from the Python version that you use. We recommend that you use Python 3.7.
Procedure
Prepare the virtual environment that is required to run a Python program.
You can directly download the Python 3.7 package or perform the following steps to package a Python environment. We recommend that you directly download the Python 3.7 package.
Create a Docker image.
You can directly download the sample Dockerfile that is used in this topic to your on-premises machine or ECS instance. Alternatively, you can create a Dockerfile on the host where Docker is installed. The Dockerfile contains the following content:
FROM centos:centos7.9.2009 RUN set -ex \ # Pre-install required components. && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\ && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \ && tar -zxvf Python-3.7.0.tgz \ && cd Python-3.7.0 \ && ./configure prefix=/usr/local/python3 \ && make \ && make install \ && make clean \ && rm -rf /Python-3.7.0* \ && yum install -y epel-release \ && yum install -y python-pip # Set the default Python version to Python 3. RUN set -ex \ # Back up resources of Python of an earlier version. && mv /usr/bin/python /usr/bin/python27 \ && mv /usr/bin/pip /usr/bin/pip-python27 \ # Set the default Python version to Python 3. && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \ && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip # Fix the YUM bug that is caused by the change in the Python version. RUN set -ex \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \ && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \ && yum install -y deltarpm # Update pip. RUN pip install --upgrade pipBuild an image and run the Docker container.
Run the following commands in the path in which Dockerfile is stored:
sudo docker build -t python-centos:3.7 . sudo docker run -itd --name python3.7 python-centos:3.7Go to the Python dependency library that is required to install the container and package the Python environment.
sudo docker exec -it python3.7 bash pip install [Required dependency library] # vi requirements.txt # pip install -r requirements.txt # numpy # pandas cd /usr/local/ zip -r python3.7.zip python3/Copy the Python environment package from the container to your host.
# Run the following command on the host to copy the virtual environment to the host: sudo docker cp python3.7:/usr/local/python3.7.zip .
Upload the copied environment.
You can upload the copied environment to Hadoop Distributed File System (HDFS) or Object Storage Service (OSS) based on your business requirements.
NoteIn this example, the copied environment is uploaded to HDFS. For information about how to upload the copied environment to OSS, see Simple upload.
Run the following command to upload the copied environment to HDFS:
# Upload the copied environment to HDFS. hdfs dfs -copyFromLocal python3.7.zip /tmp/pysparkTest and upload the Python code.
Create a
XXX.pyfile on your on-premises machine or ECS instance and use the file to check whether the Python code is correct. In this example, thepyspark_test.pyfile is used for the test.# -*- coding: utf-8 -*- import os from pyspark.sql import SparkSession def noop(x): import socket import sys host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ) print('host: ' + host) print('PYTHONPATH: ' + os.environ['PYTHONPATH']) print('PWD: ' + os.environ['PWD']) print(os.listdir('.')) return host if __name__ == '__main__': spark = SparkSession \ .builder \ .appName("test_pyspark") \ .enableHiveSupport() \ .getOrCreate() sc = spark.sparkContext # Verify the current environment variables of the system. rdd = sc.parallelize(range(10), 2) hosts = rdd.map(noop).distinct().collect() print(hosts) # Verify user-defined functions (UDFs). # https://docs.databricks.com/spark/latest/spark-sql/udf-python.html# # spark.udf.register("udf_squared", udf_squared) # spark.udf.register("udf_numpy", udf_numpy) tableName = "store" df = spark.sql("""select count(*) from %s """ % tableName) print("rdf count, %s\n" % df.count()) df.show()NoteYou must replace the sample table name
storewith the name of a table that exists in the data warehouse.Upload the Python code to HDFS.
Run the following command to upload the Python code to HDFS in the EMR cluster:
NoteIn this example, the copied environment is uploaded to HDFS. For information about how to upload the copied environment to OSS, see Simple upload.
hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
Run the
spark-submitcommand to submit jobs on the EMR Spark node in DataWorks.On the created EMR Spark node, run the following command to submit jobs:
NoteIf you upload the Python code to OSS, replace the HDFS paths involved in the code with the related OSS paths.
spark-submit --master yarn \ --deploy-mode cluster \ --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \ --conf spark.executorEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.PYTHONPATH=./PYTHONENV/python3/lib/python3.7/site-packages \ --conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \ --archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \ ## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \ --driver-memory 4g \ --driver-cores 1 \ --executor-memory 4g \ --executor-cores 1 \ --num-executors 3 \ --name TestPySpark \ hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py