Lindorm Distributed Processing System (LDPS) provides a RESTful API that allows you to submit Python-based Spark jobs. You can submit Python-based Spark jobs to run stream processing tasks, batch processing tasks, machine learning tasks, and graph computing tasks. This topic describes how to create a job in Python and submit the job to LDPS for execution.

Prerequisites

LDPS is activated for your Lindorm instance. For more information, see Activate LDPS and modify the configurations.

Procedure

  1. Define a Python-based Spark job.
  2. Package the Python-based Spark job.
  3. Upload the files of the Python-based Spark job.
  4. Submit the Python-based Spark job.

Step 1: Define a Python-based Spark job

  1. Click Sample Spark job to download the demos provided by LDPS.
  2. Extract files from the package that is downloaded. The name of the folder that contains demos of sample jobs is lindorm-spark-examples. Go to the lindorm-spark-examples/python directory and view the files in the python folder.
  3. The root directory of the project is your_project. Perform the following steps to create a Python-based Spark job:
    1. Create an empty file named __init__.py in the your_project directory.
    2. Modify the main.py file.
      1. Open the lindorm-spark-examples/python/your_project/main.py file and modify the code in the main.py file to add the path of the your_project file to sys.path. For more information, see the Notice1 section in the main.py file.
        # Notice1: You need to do the following step to complete the code modification:
        # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py
        # Step2: Please add current dir to sys.path, you should add the following code to your main file
        current_dir = os.path.abspath(os.path.dirname(__file__))
        sys.path.append(current_dir)
        print("current dir in your_project: %s" % current_dir)
        print("sys.path: %s \n" % str(sys.path))
      2. Encapsulate the entry logic into the main(argv) method in the main.py file. For more information, see the Notice2 section in the lindorm-spark-examples/python/your_project/main.py file.
        # Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function,
        # so that launcher.py in parent directory just call main(sys.argv)
        def main(argv):
            print("Receive arguments: %s \n" % str(argv))
        
            print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__)))
            # Write your code here
        
        
        if __name__ == "__main__":
            main(sys.argv)
    3. Create an entry file that is used to call the main(argv) method to start the Python-based Spark job. In the demo, the launcher.py file in the lindorm-spark-examples/python directory is the entry file. You can use the code in the launcher.py file to start the Python-based Spark job.

Step 2: Package the Python-based Spark job

  1. Package the Python runtime files and third-party class libraries on which the project depends. We recommend that you use Conda or Virtualenv to package the dependent class libraries into a tar package. For more information, see Python Package Management.
    Notice You must complete this operation in Linux. This ensures that LDPS can recognize the binary files in Python.
  2. Package the files of the project. Package the your_project file to a .zip file or .egg file.
    • You can run the following command to package the your_project file to a .zip file:
      zip -r project.zip your_project
    • For information about how to package the your_project file to a .egg file, see Building Eggs.

Step 3: Upload the files of the Python-based Spark job

Upload the files that are described in the following list to an Object Storage Service (OSS) bucket. For more information, see Simple upload.
  • The .tar file that is built in Step 2. The .tar file contains the Python runtime files and third-party class libraries.
  • The your_project.zip file or your_project.egg file that is built in Step 2.
  • The launcher.py file that is built in Step 1.

Step 4: Submit the Python-based Spark job

Call the RESTful API to submit the Python-based Spark job. The following list describes information about the RESTful API:
  • Request path: /api/v1/jars/submitJar
  • Request method: POST
The following list describes the types of request parameters that are required in a request:
  • Basic parameters.
    Parameter Type Required Example Description
    token String Yes **** The token that is used for computing resource authentication when you submit a Python_based job. For information about how to obtain the token, see View endpoints.
    appName String Yes pythonKmeans The name of the job. The name can be up to 46 characters in length.
    mainResource String Yes oss://testBucketName/kmeans.py The OSS path in which the entry file of the job is stored.
    mainClass String Yes MAIN_CLASS The MAIN function that is used to specify the Python files of the job. You must specify a non-empty string as the value of this parameter.
    args Array No ["arg1", "arg2.2"] The parameters that are included in the function that is specified by the mainClass parameter.
    configs Json No { "spark.hadoop.fs.oss.impl":"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem"} Other configurations for the job.
  • Parameters that are used to specify the runtime environment of the Python-based job. Example:
    {"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
    • To submit the files of the project, configure the spark.submit.pyFiles parameter in the value of the configs parameter. You can specify the OSS path of the .zip or .egg file that contains the files of the project, or the launcher.py file of the project.
    • To submit the .tar file that contains the Python runtime files and third-party class libraries, configure the spark.archives parameter and spark.kubernetes.driverEnv.PYSPARK_PYTHON parameter in the value of the configs parameter.
      • Use a number sign (#) to specify the value of the targetDir parameter when you configure the spark.archives parameter.
      • The spark.kubernetes.driverEnv.PYSPARK_PYTHON parameter specifies the directory in which the files of the Python runtime environment are stored.
  • Parameters that are required to upload a file to OSS. You need to include the parameters in the value of the configs parameters.
    Table 1. Parameters
    Parameter Example Description
    spark.hadoop.fs.oss.endpoint oss-cn-beijing-internal.aliyuncs.com The endpoint of the OSS bucket in which you want to store the file.
    spark.hadoop.fs.oss.accessKeyId testAccessKey ID The Access Key ID and Access Key Secret that are used for identity authorization. You can create or obtain Access Key IDs and Access Key Secrets in the Alibaba Cloud Management Console. For more information, see Obtain an AccessKey pair.
    spark.hadoop.fs.oss.accessKeySecret testAccessKey Secret
    spark.hadoop.fs.oss.impl Set the value to org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem. The class that is used to access OSS.
    Note For more information about the parameters, see Parameters.

Example

  1. Click Sample Spark job to download the file that contains demos and then extract files from the file that is downloaded.
  2. Open the your_project/main.py file and modify the entry point of the Python project.
    1. Add the path of the your_project file to sys.path.
      current_dir = os.path.abspath(os.path.dirname(__file__))
      sys.path.append(current_dir)
      print("current dir in your_project: %s" % current_dir)
      print("sys.path: %s \n" % str(sys.path))
    2. Specify the entry logic in the main.py file. You can use the following sample code to initialize the SparkSession session.
      from pyspark.sql import SparkSession
      spark = SparkSession \
          .builder \
          .appName("PythonImportTest") \
          .getOrCreate()
      print(spark.conf)
      spark.stop()
  3. Package the your_project file in the Python directory to a .zip file.
    zip -r your_project.zip your_project
  4. In Linux, use Conda to package the files of the Python runtime environment.
    conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz
  5. Upload the your_project.zip file, pyspark_conda_env.tar.gz file, and launcher.py file to OSS.
  6. Call the RESTful API to submit the job. The API request includes the parameters that specify information about the job and parameters that specify information for accessing the OSS bucket in which the Python files of the job are stored.
    curl --location --request POST 'http://LINDORM_SPARK_ADDR/api/v1/jars/submitJar' \
    --header 'Content-Type: application/json' \
    --data '{"token":"TOKEN","appName":"pythonExample","mainClass":"MAIN_CLASS","mainResource":"oss://testBucketName/launcher.py","args":["arg1"],"configs":{"spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/you_project.zip","spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment","spark.hadoop.fs.oss.endpoint":"OSS_ENDPOINT","spark.hadoop.fs.oss.impl":"org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem","spark.hadoop.fs.oss.accessKeyId":"testAccessKey ID","spark.hadoop.fs.oss.accessKeySecret":"testAccessKey Secret"}}'

Job diagnostics

After the job is submitted, you can view the status of the job and the address of the Spark web user interface (UI) on the Jobs page. For more information, see View Spark jobs.