All Products
Search
Document Center

PySpark

Last Updated: Aug 19, 2021

This topic describes how to submit a PySpark job and use a custom virtual environment in Data Lake Analytics (DLA).

Basic usage of PySpark

1. Create a main program file.

In this topic, you create a file named example.py. This file contains the following code. The main function in the sample code allows PySpark to find the entry point to start a program.

from __future__ import print_function
from pyspark.sql import SparkSession


# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    df = spark.sql("SELECT 2021")
    # print schema and data to the console
    df.printSchema()
    df.show()

2. Execute the main program file.

  1. Similar to the JAR files that are written in Scala or Java, the example.py file must be uploaded to Object Storage Service (OSS). In addition, you must use the file parameter in Spark start configurations to specify the example.py file as the start file.

  2. In the DLA console, choose Serverless Spark > Submit job from the left-side navigation pane. On the Parameter Configuration page, write the following code in the code editor:

    {
    
        "name": "Spark Python",
    
        "file": "oss://{your bucket name}/example.py"
    
        "conf": {
    
            "spark.driver.resourceSpec": "small",
    
            "spark.executor.instances": 2,
    
            "spark.executor.resourceSpec": "small",
    
            "spark.kubernetes.pyspark.pythonVersion": "3"
    
      }
    }

    Notice
    • Use the name of your OSS bucket to replace {your bucket name}.

    • The preceding code is written in Python 3, which is the same as the Python version used by Apache Spark. If you use the spark.kubernetes.pyspark.pythonVersion parameter to specify the Python version, Python 2.7 is used by default.

  3. Click Execute.

Upload a self-developed or third-party module

When you develop a Python program, self-developed or third-party modules are usually used. You can upload these modules and load them to the execution environment for the main program to call.

The following procedure demonstrates how to calculate the after-tax income of employees.

1. Prepare test data.

Create a CSV file named staff.csv, and upload the file to OSS. This file contains the following content that indicates the basic information and income of each employee.

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
Note

For more information about how to upload a file to OSS, see Simple upload.

2. Develop a dependency method.

  1. Create a folder named tools.

  2. Create a file named func.py in the tools folder. This file contains the following content.

    def tax(salary):
        """
        convert string to int
        then cut 15% tax from the salary
        return a float number
    
        :param salary: The salary of staff worker
        :return:
        """
        return 0.15 * int(salary)
  3. Package the tools folder into the tools.zip file, and upload the file to OSS. The following code is used to generate the tools.zip file.p286346

    Notice

    The ZIP compression tools vary based on the operating system that you use. Make sure that the tools folder is the root directory after the tools.zip file is decompressed.

3. Develop the main program.

Develop a Spark program in Python. This program allows DLA to read data from the staff.csv file in OSS. Then, register this file as a DataFrame. In addition, register the tax method in the package that is created in Step 2 as a Spark UDF.Then, use the Spark UDF to calculate the DataFrame and generate results.

In the following sample code, use the name of your OSS bucket to replace {your bucket name}.

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

Write the code to the example.py file, and upload the file to OSS.

4. Submit a job.

In the DLA console, choose Serverless Spark > Submit job from the left-side navigation pane. Then, create a job and enter the following code in the code editor:

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/tools.zip"],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

The following table describes the parameters in the code.

Parameter

Description

Required

conf

The configuration parameters that are required for the Spark job.

  • "spark.dla.connectors": "oss": indicates that the OSS connector is used.

  • "spark.kubernetes.pyspark.pythonVersion": "3": indicates that Python 3 is used to run the job.

No

Note

For more information about other parameters, see Configure a Spark job.

Use a custom virtual environment for PySpark jobs

If third-party dependencies are required, you can use a virtual environment to upload the local debugging environment to the Spark cluster in the cloud. This way, a large number of complex system packages, such as Pandas, Numpy, and PyMySQL packages, can be loaded to an isolated environment and migrated to the same operating system. You can use one of the following methods to generate a virtual environment package:

Note

For more information about virtual environments, see Virtual Environments and Packages.

Enable the system to automatically generate a virtual environment package

1. Prepare a Linux operating system.

The operating system for packaging a virtual environment must be the same as the operating system of the serverless Spark engine where the virtual environment package is used. Therefore, the virtual environment package that needs to be uploaded to the serverless Spark engine of DLA must be installed in Linux. You can use one of the following methods to prepare a Linux operating system for packaging a virtual environment:

  • Prepare a computer that runs CentOS Linux 7 to package the virtual environment.

  • Purchase an Elastic Compute Service (ECS) instance that runs CentOS Linux 7 and select the pay-as-you-go billing method for the instance. After the virtual environment package is generated, release the ECS instance.

  • Use the Docker image of CentOS Linux 7 to package the virtual environment.

2. Package a Python virtual environment in Linux.

You can use a tool, such as virtualenv or conda, to package a virtual environment. Before you package a virtual environment, select a tool based on your business requirements and install the tool in Linux.

Notice
  • The serverless Spark engine of DLA supports Python 3.7 and earlier major Python versions.

  • The serverless Spark engine runs CentOS Linux 7. We recommend that you use CentOS Linux 7 in which Docker is installed to package a virtual environment.

The following code demonstrates how to use virtualenv to generate a venv.zip file. The venv.zip file contains scikit-spark of a specific version.

# create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python Python3.7 venv

# active environment
source venv/bin/activate

# install third part modules
pip install scikit-spark==0.4.0

# check the result
pip list

# zip the environment
zip -r venv.zip venv
Note

For more information about how to use conda to generate a virtual environment package, see Managing environments.

3. Run the virtual environment in the serverless Spark engine.

Before you submit a Spark job, use the following code to configure the job: In the code, the spark.pyspark.python parameter specifies the file that is used to run the virtual environment in the uploaded package. For more information about the parameters, see Configure a Spark job

{
    "name": "venv example",
    "archives": [
        "oss://test/venv.zip#PY3"
    ],
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.pyspark.python": "./PY3/venv/bin/python3",
        "spark.executor.resourceSpec": "medium"
    },
    "file": "oss://test/example.py"
}
Note

As defined by the Apache Spark community, venv.zip#PY3 indicates that the package is decompressed to the PY3 folder under the working directory of a compute node for local data access. If you do not use the number sign (#) to specify the folder name, the name of the file in the package is automatically used as the folder name.

Use an image tool to generate a virtual environment package

1. Run the following command to pull a Docker image from a registry:

docker pull registry.cn-hangzhou.aliyuncs.com/dla_spark/dla-venv:0.1

2. Place the requirements.txt file in the /home/admin path and mount the admin folder to the Docker image.

docker run -ti -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
Note

The requirements.txt file describes the standard Python dependencies. For more information, see User Guide.

The packaging program is automatically executed. You can view the following log information during the execution process.

adding: venv-20210611-095454/lib64/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/site-packages/ (stored 0%)
  adding: venv-20210611-095454/pyvenv.cfg (deflated 30%)
  venv-20210611-095454.zip

3. In the /home/admin directory, find the virtual environment package venv-20210611-095454.zip. For more information about how to use the virtual environment package, see Use a virtual environment in Spark.

4. (Optional) Run the following command to obtain more information about how to use the Docker image:

docker run -i dla-venv:0.1

Used to create venv package for Aliyun DLA 
Docker with host machine volumes: https://docs.docker.com/storage/volumes/
Please copy requirements.txt to a folder and mount the folder to docker as /tmp path 
Usage example: docker run -it -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt 
 -p python version, could be python2 or python3
 -f path to requirements.txt, default is /tmp/requirements.txt

FAQ

If automatic packaging fails when you use an image tool to generate a virtual environment package, you can run the following command to start CentOS Linux 7 and use the root account to perform operations.

docker run -ti --entrypoint bash dla-venv:0.1

For more information about subsequent operations, see Enable the system to automatically generate a virtual environment package.