After a Python-based Spark cluster is created, you can submit a job. This topic uses a Python-based Spark job in an example.
Note: When you create a Python-based Spark cluster, you must use the image of version spark_2_4_5_dla_0_0_2
.
Python-based Spark is available in the China (Shenzhen) region and will be available in other regions.
1. Prepare the test data
Generate a CSV file named staff.csv
and upload it to Object Storage Service (OSS).
The CSV file lists the information and income of each employee.
name,age,gender,salary
Lucky,25,male,100
Lucy,23,female,150
Martin,30,male,180
Rose,31,female,200
2. Develop a dependency method
To calculate the after-tax income of each employee, create a file named func.py
, write a tax
method to it, and register the method as a Spark user-defined function (UDF) to facilitate subsequent operations.
Sample code:
def tax(salary):
"""
convert string to int
then cut 15% tax from the salary
return a float number
:param salary: The salary of staff worker
:return:
"""
return 0.15 * int(salary)
To introduce the tax method by using pyFiles
, store the method in a ZIP-compressed package.
The following figure shows the directory structure of the compressed package.
Based on Python syntax, a module named tools
is created, and the func.tax
method is stored under the tools module.
Compress the depend folder as depend.zip
and upload the package to OSS.
3. Develop the main program
Develop a Python-based Spark program to read the data in the CSV file from OSS, and register the CSV file as a DataFrame
. Register the tax
method in the depend.zip dependency package as a Spark UDF
. Then, use the Spark UDF
to calculate the DataFrame
and generate the results.
Replace {your bucket}
with the name of your OSS bucket in the following sample code:
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
# import third part file
from tools import func
if __name__ == "__main__":
# init pyspark context
spark = SparkSession\
.builder\
.appName("Python Example")\
.getOrCreate()
# read csv from oss to a dataframe, show the table
df = spark.read.csv('oss://{your bucket}/staff.csv', mode="DROPMALFORMED",inferSchema=True, header = True)
# print schema and data to the console
df.printSchema()
df.show()
# create an udf
taxCut = udf(lambda salary: func.tax(salary), FloatType())
# cut tax from salary and show result
df.select("name", taxCut("salary").alias("final salary")).show()
spark.stop()
Write the preceding code to the example.py
file and upload the file to OSS.
4. Submit the job
Create a cluster. When you create the cluster, select the latest Spark image version spark_2_4_5_dla_0_0_2
.
In the left-side navigation pane of the DLA console, choose Serverless Spark > Submit job. On the page that appears, click Create Job. In the dialog box that appears, complete the settings and submit the following job information:
{
"name": "Spark Python",
"file": "oss://{your bucket name}/example.py",
"pyFiles": ["oss://{your bucket name}/depend.zip"],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "small",
"spark.dla.connectors": "oss",
"spark.kubernetes.pyspark.pythonVersion": "3"
}
}
Replace {your bucket name}
with the name of your OSS bucket. In this example, Python 3 is used to run this job. You can specify the spark.kubernetes.pyspark.pythonVersion
parameter to decide which version of Python is used the same as what you do to Spark community versions. If you do not specify this parameter, Python 2.7 is used.
5. Parameters
Parameter | Description | Required |
---|---|---|
name | The name of the Spark job. | No |
file | The Python file where the Spark job is located. The value of this parameter must be an OSS endpoint. | Yes |
pyFiles | The package of the third-party module on which the Spark job is dependent. You can separate multiple packages with commas (,). | No |
conf | The configuration parameters that are used by the Spark job. The configuration "spark.dla.connectors": "oss" indicates that the Spark job can connect to OSS. The configuration "spark.kubernetes.pyspark.pythonVersion": "3" indicates that the Spark job must be run by using Python 3. | No |