PySpark calls Python API operations to run Spark jobs. PySpark jobs must be run in a specific Python environment. By default, E-MapReduce (EMR) supports Python. If the Python version supported by EMR cannot be used to run PySpark jobs, you can refer to this topic to configure a Python environment that meets specific requirements and run PySpark jobs in DataWorks.

Prerequisites

In this topic, the DataWorks workspace and EMR cluster that are used reside in the same region. This section describes the prerequisites that must be met at the DataWorks and EMR sides:
  • DataWorks side

    An EMR Spark node is created to run PySpark jobs in DataWorks, and the spark-submit command is run to submit PySpark jobs.

  • EMR side
    An EMR environment that includes the following configurations is prepared:
    • An EMR cluster. In this example, an EMR on ECS cluster is used.
    • Optional:A Python package used for sample verification. You can package a Python environment on your on-premises machine or Elastic Compute Service (ECS) instance. Alternatively, you can directly download the sample Python package Python 3.7 that is used in this topic. To package a Python environment, make sure that the Docker runtime environment and the Python runtime environment are installed on your on-premises machine or ECS instance.
      Note In this topic, Python 3.7 is used only for reference. You can select a Python version based on your business requirements. The Python version supported by EMR may be different from the Python version that you use. We recommend that you use Python 3.7.

Procedure

  1. Optional:Prepare the virtual environment that is required to run a Python program.
    You can directly download the Python 3.7 package or perform the following steps to package a Python environment:
    1. Create a Docker image.

      You can directly download the sample Dockerfile that is used in this topic to your on-premises machine or ECS instance. Alternatively, you can create a Dockerfile on the host where Docker is installed. The Dockerfile contains the following content:

      FROM centos:centos7.9.2009
      RUN set -ex \
      # Pre-install required components. 
      && yum install -y wget tar libffi-devel zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel readline-devel tk-devel gcc make initscripts zip\
      && wget https://www.python.org/ftp/python/3.7.0/Python-3.7.0.tgz \
      && tar -zxvf Python-3.7.0.tgz \
      && cd Python-3.7.0 \
      && ./configure prefix=/usr/local/python3 \
      && make \
      && make install \
      && make clean \
      && rm -rf /Python-3.7.0* \
      && yum install -y epel-release \
      && yum install -y python-pip
      # Set the default Python version to Python 3. 
      RUN set -ex \
      # Back up resources of Python of an earlier version. 
      && mv /usr/bin/python /usr/bin/python27 \
      && mv /usr/bin/pip /usr/bin/pip-python27 \
      # Set the default Python version to Python 3. 
      && ln -s /usr/local/python3/bin/python3.7 /usr/bin/python \
      && ln -s /usr/local/python3/bin/pip3 /usr/bin/pip
      # Fix the YUM bug that is caused by the change in the Python version. 
      RUN set -ex \
      && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/bin/yum \
      && sed -i "s#/usr/bin/python#/usr/bin/python27#" /usr/libexec/urlgrabber-ext-down \
      && yum install -y deltarpm
      # Update pip. 
      RUN pip install --upgrade pip
    2. Build an image and run the Docker container.
      Run the following commands in the path in which Dockerfile is stored:
      docker build -t python-centos:3.7 .
      docker run -itd --name python3.7 python-centos:3.7
    3. Go to the Python dependency library that is required to install the container and package the Python environment.
      docker exec -it  python3.7 bash
      
      pip install [Required dependency library]
      # vi requirements.txt
      # pip install -r requirements.txt
      # numpy
      # pandas
      
      cd /usr/local/
      zip -r python3.7.zip python3/
    4. Copy the Python environment package from the container to your host.
      # Run the following command on the host to copy the virtual environment to the host: 
      docker cp python3.7:/usr/local/python3.7.zip .                         
  2. Upload the copied environment.
    You can upload the copied environment to Hadoop Distributed File System (HDFS) or Object Storage Service (OSS) based on your business requirements.
    Note In this example, the copied environment is uploaded to HDFS. For information about how to upload the copied environment to OSS, see Simple upload.
    Run the following command to upload the copied environment to HDFS:
    # Upload the copied environment to HDFS. 
    hdfs dfs -copyFromLocal python3.7.zip /tmp/pyspark                          
  3. Test and upload the Python code.
    1. Create a XXX.py file on your on-premises machine or ECS instance and use the file to check whether the Python code is correct. In this example, the pyspark_test.py file is used for the test.
      # -*- coding: utf-8 -*-
      
      import os
      from pyspark.sql import SparkSession
      
      def noop(x):
          import socket
          import sys
          host = socket.gethostname() + ' '.join(sys.path) + ' '.join(os.environ)
          print('host: ' + host)
          print('PYTHONPATH: ' + os.environ['PYTHONPATH'])
          print('PWD: ' + os.environ['PWD'])
          print(os.listdir('.'))
          return host
      
      
      if __name__ == '__main__':
      
          spark = SparkSession \
              .builder \
              .appName("test_pyspark") \
              .enableHiveSupport() \
              .getOrCreate()
      
          sc = spark.sparkContext
          # Verify the current environment variables of the system. 
          rdd = sc.parallelize(range(10), 2)
          hosts = rdd.map(noop).distinct().collect()
          print(hosts)
      
          # Verify user-defined functions (UDFs). 
          # https://docs.databricks.com/spark/latest/spark-sql/udf-python.html#
          # spark.udf.register("udf_squared", udf_squared)
          # spark.udf.register("udf_numpy", udf_numpy)
      
          tableName = "store"
          df = spark.sql("""select count(*) from %s """ % tableName)
          print("rdf count, %s\n" % df.count())
          df.show()
    2. Upload the Python code to HDFS.
      Run the following command to upload the Python code to HDFS in the EMR cluster:
      Note In this example, the copied environment is uploaded to HDFS. For information about how to upload the copied environment to OSS, see Simple upload.
      hdfs dfs -copyFromLocal pyspark_test.py /tmp/pyspark
  4. Run the spark-submit command to submit jobs on the EMR Spark node in DataWorks.
    On the created EMR Spark node, run the following command to submit jobs:
    Note If you upload the Python code to OSS, replace the HDFS paths involved in the code with the related OSS paths.
    spark-submit --master yarn \
    --deploy-mode cluster \
    --conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=./PYTHONENV/python3/bin/python3.7 \
    --conf spark.executorEnv.PYTHONPATH=. \
    --conf spark.yarn.appMasterEnv.PYTHONPATH=. \
    --conf spark.yarn.appMasterEnv.JOBOWNER=LiuYuQuan \
    --archives hdfs://hdfs-cluster/tmp/pyspark/python3.7.zip#PYTHONENV \
    ## --py-files hdfs://hdfs-cluster/tmp/pyspark/mc_pyspark-0.1.0-py3-none-any.zip \
    --driver-memory 4g \
    --driver-cores 1 \
    --executor-memory 4g \
    --executor-cores 1 \
    --num-executors 3 \
    --name TestPySpark \
    hdfs://hdfs-cluster/tmp/pyspark/pyspark_test.py