All Products
Search
Document Center

Data Lake Analytics - Deprecated:PySpark

Last Updated:Feb 07, 2024

This topic describes how to submit a PySpark job and use a custom virtual environment in Data Lake Analytics (DLA).

Important

DLA is discontinued. AnalyticDB for MySQL supports the features of DLA and provides additional features and enhanced performance. For more information about how to use PySpark and custom virtual environments in AnalyticDB for MySQL, see Use PySpark to develop Spark applications.

Use PySpark

1. Create a main program file

Create a file named example.py by writing the following sample code. The sample code includes a main function that serves as the entry point for PySpark to start a program.

from __future__ import print_function
from pyspark.sql import SparkSession


# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    df = spark.sql("SELECT 2021")
    # print schema and data to the console
    df.printSchema()
    df.show()

2. Execute the main program file

  1. Similar to JAR files that are written in Scala or Java, the example.py file must be uploaded to Object Storage Service (OSS). In addition, you must specify the example.py file as the startup file by using the file parameter in your Spark startup configurations

  2. Log on to the DLA console. In the left-side navigation pane, choose Serverless Spark > Submit job. On the Parameter Configuration page, enter the following code in the code editor.

    {
    
        "name": "Spark Python",
    
        "file": "oss://{your bucket name}/example.py"
    
        "conf": {
    
            "spark.driver.resourceSpec": "small",
    
            "spark.executor.instances": 2,
    
            "spark.executor.resourceSpec": "small",
    
            "spark.kubernetes.pyspark.pythonVersion": "3"
    
      }
    }

    Important
    • Replace {your bucket name} with the name of the OSS bucket to which the example.py file is uploaded.

    • The preceding code is written in Python 3, which is the Python version used by Apache Spark. If you use the spark.kubernetes.pyspark.pythonVersion parameter to specify the Python version, Python 2.7 is used by default.

  3. Click Execute.

Upload a self-developed or third-party module

When you develop a Python program, you can use self-developed or third-party modules. You can upload these modules and load them to the execution environment for the main program to call.

This section shows an example on how to calculate the after-tax incomes of employees.

1. Prepare test data

Create a CSV file named staff.csv and upload the file to OSS. This file contains the following content that indicates the basic information and income of each employee.

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
Note

For more information about how to upload a file to OSS, see Simple upload.

2. Compile dependencies

  1. Create a folder named tools.

  2. Create a file named func.py in the tools folder. This file contains the following content:

    def tax(salary):
        """
        convert string to int
        then cut 15% tax from the salary
        return a float number
    
        :param salary: The salary of staff worker
        :return:
        """
        return 0.15 * int(salary)
  3. Compress and upload the tools folder to OSS. In this example, the folder is compressed to the tools.zip package. Run the following code to generate the tools.zip package.p286346

    Important

    The ZIP compression tool varies based on the operating system that you use. Make sure that the tools folder is the root directory after the tools.zip package is decompressed.

3. Develop the main program

Develop a Spark program in Python. This program allows DLA to read data from the staff.csv file in OSS. Then, register this file as a DataFrame. In addition, register the tax method in the package that is created in Step 2 as a Spark UDF. Then, use the Spark UDF to calculate the DataFrame and generate results.

In the following sample code, replace {your bucket name} with the name of the OSS bucket to which the tools.zip package is uploaded.

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

Write the preceding code to the example.py file and upload the file to OSS.

4. Submit a job

In the left-side navigation pane of the DLA console, choose Serverless Spark > Submit job. On the Parameter Configuration page, create a job and enter the following code in the code editor.

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/tools.zip"],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

The following table describes the parameters in the code.

Parameter

Description

Required

conf

The configuration parameters that are required for the Spark job.

  • "spark.dla.connectors": "oss": indicates that the OSS connector is used.

  • "spark.kubernetes.pyspark.pythonVersion": "3": indicates that Python 3 is used to run the job.

No

Note

For more information about other parameters, see Configure a Spark job.

Use a custom virtual environment for PySpark jobs

If third-party dependencies are required, you can use a virtual environment to upload the local debugging environment to the Spark cluster in the cloud. This way, a large number of complex system packages, such as Pandas, Numpy, and PyMySQL packages, can be loaded to an isolated environment and migrated to the same operating system. You can use one of the following methods to generate a virtual environment package:

Note

For more information about virtual environments, see Virtual Environments and Packages.

Enable the system to automatically generate a virtual environment package

1. Prepare a Linux operating system

The operating system for packaging a virtual environment must be the same as the operating system of the serverless Spark engine where the virtual environment package is used. Therefore, the virtual environment package that you want to upload to the serverless Spark engine of DLA must be installed in Linux. You can use one of the following methods to prepare a Linux operating system for packaging a virtual environment:

  • Prepare a computer that runs CentOS 7 to package the virtual environment.

  • Purchase an Elastic Compute Service (ECS) instance that runs CentOS 7 and select the pay-as-you-go billing method for the instance. After the virtual environment package is generated, release the ECS instance.

  • Use the Docker image of CentOS 7 to package the virtual environment.

2. Package a Python virtual environment in Linux

You can use a tool, such as virtualenv or conda, to package a virtual environment. Before you package a virtual environment, select a tool based on your business requirements and install the tool in Linux.

Important
  • The serverless Spark engine of DLA supports Python 3.7 and earlier major Python versions.

  • The serverless Spark engine runs CentOS 7. We recommend that you use CentOS 7 in which Docker is installed to package a virtual environment.

The following code demonstrates how to use virtualenv to generate a venv.zip file. The venv.zip file contains scikit-spark of a specific version.

# create directory venv at current path with python3
# MUST ADD --copies !
virtualenv --copies --download --python Python3.7 venv

# active environment
source venv/bin/activate

# install third part modules
pip install scikit-spark==0.4.0

# check the result
pip list

# zip the environment
zip -r venv.zip venv
Note

For more information about how to use conda to generate a virtual environment package, see Managing environments.

3. Run the virtual environment in the serverless Spark engine

Before you submit a Spark job, run the following code to configure the job. In the code, the spark.pyspark.python parameter specifies the file that is used to run the virtual environment in the uploaded package. For more information about the parameters, see Configure a Spark job

{
    "name": "venv example",
    "archives": [
        "oss://test/venv.zip#PY3"
    ],
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.dla.connectors": "oss",
        "spark.executor.instances": 1,
        "spark.dla.job.log.oss.uri": "oss://test/spark-logs",
        "spark.pyspark.python": "./PY3/venv/bin/python3",
        "spark.executor.resourceSpec": "medium"
    },
    "file": "oss://test/example.py"
}
Note

As defined by the Apache Spark community, venv.zip#PY3 indicates that the package is decompressed to the PY3 folder in the working directory of a compute node for local data access. If you do not use the number sign (#) to specify the folder name, the name of the file in the package is automatically used as the folder name.

Use an image tool to generate a virtual environment package

1. Run the following command to pull a Docker image from a registry:

-sudo docker pull registry.cn-hangzhou.aliyuncs.com/dla_spark/dla-venv:0.1

2. Place the requirements.txt file in the /home/admin directory and mount the admin folder to the Docker image.

-sudo docker run -ti -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt
Note

The requirements.txt file describes the standard Python dependencies. For more information, see User Guide.

The packaging program is automatically executed. You can view the following log information during the execution process.

adding: venv-20210611-095454/lib64/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/ (stored 0%)
  adding: venv-20210611-095454/lib64/python3.6/site-packages/ (stored 0%)
  adding: venv-20210611-095454/pyvenv.cfg (deflated 30%)
  venv-20210611-095454.zip

3. In the /home/admin directory, find the virtual environment package named venv-20210611-095454.zip. For more information about how to use the virtual environment package, see the Run the virtual environment in the serverless Spark engine section of this topic.

4. Optional. Run the following command to obtain more information about how to use the Docker image:

-sudo docker run -i dla-venv:0.1

Used to create venv package for Aliyun DLA 
Docker with host machine volumes: https://docs.docker.com/storage/volumes/
Please copy requirements.txt to a folder and mount the folder to docker as /tmp path 
Usage example: docker run -it -v /home/admin:/tmp dla-venv:0.1 -p python3 -f /tmp/requirements.txt 
 -p python version, could be python2 or python3
 -f path to requirements.txt, default is /tmp/requirements.txt

FAQ

If automatic packaging fails when you use an image tool to generate a virtual environment package, you can run the following command to start CentOS 7 and use the root account to perform operations.

-sudo docker run -ti --entrypoint bash dla-venv:0.1

For more information about subsequent operations, see the Enable the system to automatically generate a virtual environment package section of this topic.