Lindorm Distributed Processing System (LDPS) provides a RESTful API that allows you to submit Python-based Spark jobs. You can submit Python-based Spark jobs to run stream processing tasks, batch processing tasks, machine learning tasks, and graph computing tasks. This topic describes how to create a job in Python and submit the job to LDPS for execution.

Prerequisites

LDPS is activated for your Lindorm instance. For more information, see Activate LDPS and modify the configurations.

Procedure

  1. Define a Python-based Spark job.
  2. Package the Python-based Spark job.
  3. Upload the files of the Python-based Spark job.
  4. Submit the Python-based Spark job.

Step 1: Define a Python-based Spark job

  1. Click Sample Spark job to download the demos provided by LDPS.
  2. Extract files from the package that is downloaded. The name of the folder that contains demos of sample jobs is lindorm-spark-examples. Go to the lindorm-spark-examples/python directory and view the files in the python folder.
  3. The root directory of the project is your_project. Perform the following steps to create a Python-based Spark job:
    1. Create an empty file named __init__.py in the your_project directory.
    2. Modify the main.py file.
      1. Open the lindorm-spark-examples/python/your_project/main.py file and modify the code in the main.py file to add the path of the your_project file to sys.path. For more information, see the Notice1 section in the main.py file.
        # Notice1: You need to do the following step to complete the code modification:
        # Step1: Please add a "__init__.py" to your project directory, your project will act as a module of launcher.py
        # Step2: Please add current dir to sys.path, you should add the following code to your main file
        current_dir = os.path.abspath(os.path.dirname(__file__))
        sys.path.append(current_dir)
        print("current dir in your_project: %s" % current_dir)
        print("sys.path: %s \n" % str(sys.path))
      2. Encapsulate the entry logic into the main(argv) method in the main.py file. For more information, see the Notice2 section in the lindorm-spark-examples/python/your_project/main.py file.
        # Notice2: Move the code in `if __name__ == "__main__":` branch to a new defined main(argv) function,
        # so that launcher.py in parent directory just call main(sys.argv)
        def main(argv):
            print("Receive arguments: %s \n" % str(argv))
        
            print("current dir in main: %s \n" % os.path.abspath(os.path.dirname(__file__)))
            # Write your code here
        
        
        if __name__ == "__main__":
            main(sys.argv)
    3. Create an entry file that is used to call the main(argv) method to start the Python-based Spark job. In the demo, the launcher.py file in the lindorm-spark-examples/python directory is the entry file. You can use the code in the launcher.py file to start the Python-based Spark job.

Step 2: Package the Python-based Spark job

  1. Package the Python runtime files and third-party class libraries on which the project depends. We recommend that you use Conda or Virtualenv to package the dependent class libraries into a tar package. For more information, see Python Package Management.
    Important You must complete this operation in Linux. This ensures that LDPS can recognize the binary files in Python.
  2. Package the files of the project. Package the files of the your_project project to a ZIP or EGG file.
    • You can run the following command to package the files of the your_project project to a ZIP file:
      zip -r project.zip your_project
    • For information about how to package the files of the your_project project to an EGG file:, see Building Eggs.

Step 3: Upload the files of the Python-based Spark job

Upload the files that are described in the following list to an Object Storage Service (OSS) bucket. For more information, see Simple upload.
  • The .tar file that is built in Step 2. The .tar file contains the Python runtime files and third-party class libraries.
  • The your_project.zip file or your_project.egg file that is built in Step 2.
  • The launcher.py file that is built in Step 1.

Step 4: Submit the Python-based Spark job

LDPS allows you to submit and manage jobs by using the following two methods:
You must configure the following two types of request parameters:
  • Parameters that are used to specify the runtime environment of the Python-based job. Example:
    {"spark.archives":"oss://testBucketName/pyspark_conda_env.tar.gz#environment", "spark.kubernetes.driverEnv.PYSPARK_PYTHON":"./environment/bin/python","spark.submit.pyFiles":"oss://testBucketName/your_project.zip"}
    • To submit the files of the project, configure the spark.submit.pyFiles parameter in the value of the configs parameter. You can specify the OSS path of the ZIP or EGG file that contains the files of the project, or the launcher.py file of the project.
    • To submit the .tar file that contains the Python runtime files and third-party class libraries, configure the spark.archives parameter and spark.kubernetes.driverEnv.PYSPARK_PYTHON parameter in the value of the configs parameter.
      • Use a number sign (#) to specify the value of the targetDir parameter when you configure the spark.archives parameter.
      • The spark.kubernetes.driverEnv.PYSPARK_PYTHON parameter specifies the directory in which the files of the Python runtime environment are stored.
  • Parameters that are required to upload a file to OSS. You need to include the parameters in the value of the configs parameters.
    Table 1. Parameters
    ParameterExampleDescription
    spark.hadoop.fs.oss.endpointoss-cn-beijing-internal.aliyuncs.comThe endpoint of the OSS bucket in which you want to store the file.
    spark.hadoop.fs.oss.accessKeyIdtestAccessKey IDThe Access Key ID and Access Key Secret that are used for identity authorization. You can create or obtain Access Key IDs and Access Key Secrets in the Alibaba Cloud Management Console. For more information, see Obtain an AccessKey pair.
    spark.hadoop.fs.oss.accessKeySecrettestAccessKey Secret
    spark.hadoop.fs.oss.implSet the value to org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem.The class that is used to access OSS.
    Note For more information about the parameters, see Parameters.

Examples

  1. Click Sample Spark job to download the file that contains demos and then extract files from the file that is downloaded.
  2. Open the your_project/main.py file and modify the entry point of the Python project.
    1. Add the path of your_project file to sys.path.
      current_dir = os.path.abspath(os.path.dirname(__file__))
      sys.path.append(current_dir)
      print("current dir in your_project: %s" % current_dir)
      print("sys.path: %s \n" % str(sys.path))
    2. Specify the entry logic in the main.py file. You can use the following sample code to initialize the SparkSession session.
      from pyspark.sql import SparkSession
      spark = SparkSession \
          .builder \
          .appName("PythonImportTest") \
          .getOrCreate()
      print(spark.conf)
      spark.stop()
  3. Package the your_project file in the Python directory to a .zip file.
    zip -r your_project.zip your_project
  4. In Linux, use Conda to package the files of the Python runtime environment.
    conda create -y -n pyspark_conda_env -c conda-forge numpy conda-pack
    conda activate pyspark_conda_env
    conda pack -f -o pyspark_conda_env.tar.gz
  5. Upload the your_project.zip file, pyspark_conda_env.tar.gz file, and launcher.py file to OSS.
  6. Submit the job by using one of the following methods:

Job diagnostics

After the job is submitted, you can view the status of the job and the address of the Spark web user interface (UI) on the Jobs page. For more information, see View the details of a Spark job. If you have any problems when you submit the job, submit a ticket and provide the job ID and the address of the Spark web UI.