All Products
Search
Document Center

Python-based Spark job

Last Updated: Aug 07, 2020

After a Python-based Spark cluster is created, you can submit a job. This topic uses a Python-based Spark job in an example.

Note: When you create a Python-based Spark cluster, you must use the image of version spark_2_4_5_dla_0_0_2.

Python-based Spark is available in the China (Shenzhen) region and will be available in other regions.

1. Prepare the test data

Generate a CSV file named staff.csv and upload it to Object Storage Service (OSS).

The CSV file lists the information and income of each employee.

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200

2. Develop a dependency method

To calculate the after-tax income of each employee, create a file named func.py, write a tax method to it, and register the method as a Spark user-defined function (UDF) to facilitate subsequent operations.

Sample code:

def tax(salary):
    """
    convert string to int
    then cut 15% tax from the salary
    return a float number

    :param salary: The salary of staff worker
    :return:
    """
    return 0.15 * int(salary)

To introduce the tax method by using pyFiles, store the method in a ZIP-compressed package.

The following figure shows the directory structure of the compressed package.

The internal structure of the compressed package

Based on Python syntax, a module named tools is created, and the func.tax method is stored under the tools module.

Compress the depend folder as depend.zip and upload the package to OSS.

3. Develop the main program

Develop a Python-based Spark program to read the data in the CSV file from OSS, and register the CSV file as a DataFrame. Register the tax method in the depend.zip dependency package as a Spark UDF. Then, use the Spark UDF to calculate the DataFrame and generate the results.

Replace {your bucket} with the name of your OSS bucket in the following sample code:

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

Write the preceding code to the example.py file and upload the file to OSS.

4. Submit the job

Create a cluster. When you create the cluster, select the latest Spark image version spark_2_4_5_dla_0_0_2.

In the left-side navigation pane of the DLA console, choose Serverless Spark > Submit job. On the page that appears, click Create Job. In the dialog box that appears, complete the settings and submit the following job information:

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/depend.zip"],
    "conf": {
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

Replace {your bucket name} with the name of your OSS bucket. In this example, Python 3 is used to run this job. You can specify the spark.kubernetes.pyspark.pythonVersion parameter to decide which version of Python is used the same as what you do to Spark community versions. If you do not specify this parameter, Python 2.7 is used.

5. Parameters

ParameterDescriptionRequired
nameThe name of the Spark job.No
fileThe Python file where the Spark job is located. The value of this parameter must be an OSS endpoint.Yes
pyFilesThe package of the third-party module on which the Spark job is dependent. You can separate multiple packages with commas (,).No
confThe configuration parameters that are used by the Spark job. The configuration "spark.dla.connectors": "oss" indicates that the Spark job can connect to OSS. The configuration "spark.kubernetes.pyspark.pythonVersion": "3" indicates that the Spark job must be run by using Python 3.No