All Products
Search
Document Center

PySpark

Last Updated: Mar 15, 2021

This topic describes how to submit a PySpark job.

1. Prepare test data

Generate the staff.csv file and upload this file to Object Storage Service (OSS).

This file contains the information and income of each employee.

name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200

2. Develop a Spark user-defined function (UDF)

To calculate the after-tax income of each employee, create the func.py file, write the tax method to the file, and register this method as a Spark UDF to facilitate subsequent operations.

Sample code in the func.py file:

def tax(salary):
    """
    convert string to int
    then cut 15% tax from the salary
    return a float number

    :param salary: The salary of staff worker
    :return:
    """
    return 0.15 * int(salary)

To introduce the tax method by using the func.py file, package the method into a ZIP file.

Based on the Python syntax, a module named tools is created based on the Python syntax and the func.tax method is stored under the tools module.

Package the tools folder into a ZIP file, and upload the package to OSS.

Notice

The ZIP compression tools for different operating systems are slightly different. You must make sure that the tools folder is in the top-level directory after the package is decompressed.

3. Develop the main program

Develop a Python-based Spark program to read data from the staff.csv file in OSS, and register this file as a DataFrame. Register the tax method in the tools.zip file as a Spark UDF. Then, use the Spark UDF to calculate the DataFrame and generate results.

Replace {your bucket} with the name of your OSS bucket in the following sample code:

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# import third part file
from tools import func

if __name__ == "__main__":
    # init pyspark context
    spark = SparkSession\
        .builder\
        .appName("Python Example")\
        .getOrCreate()

    # read csv from oss to a dataframe, show the table
    df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
    # print schema and data to the console
    df.printSchema()
    df.show()

    # create an udf
    taxCut = udf(lambda salary: func.tax(salary), FloatType())

    # cut tax from salary and show result
    df.select("name", taxCut("salary").alias("final salary")).show()
    spark.stop()

Write the preceding code to the example.py file and upload the file to OSS.

4. Submit a job

In the left-side navigation pane of the Data Lake Analytics (DLA) console, choose Serverless Spark > Submit job. On the page that appears, click Create Job. In the dialog box that appears, complete the settings and submit the following job information:

{
    "name": "Spark Python",
    "file": "oss://{your bucket name}/example.py",
    "pyFiles": ["oss://{your bucket name}/tools.zip"],
    "conf": {
        "spark.dla.roleArn": "acs:ram::11111111111:role/sparkramroletest",
        "spark.driver.resourceSpec": "small",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "small",
        "spark.dla.connectors": "oss",
        "spark.kubernetes.pyspark.pythonVersion": "3"
    }
}

Replace {your bucket name} with the name of your OSS bucket. In this example, Python 3 is used to run this job. Similar to the open source Apache Spark, the serverless Spark engine of DLA allows you to specify the spark.kubernetes.pyspark.pythonVersion parameter to determine the Python version. If you do not specify this parameter, Python 2.7 is used.

Note

spark.dla.roleArn

5. Parameters

Parameter

Description

Required

name

The name of the Spark job.

No

file

The Python file to which the Spark job belongs. The value of this parameter must be the endpoint of OSS.

Yes

pyFiles

The package of a third-party module that the Spark job depends on. You can separate multiple package names with commas (,).

No

conf

Configuration parameters that are required in the Spark job.

"spark.dla.connectors": "oss"

Specifies that the job must be able to connect to OSS.

"spark.kubernetes.pyspark.pythonVersion": "3"

Specifies that the job runs by using Python 3.

No