This topic describes how to use the SDK for Python to submit a π computing task, view the task status and logs, kill an expired task, and view historical jobs of a virtual cluster (VC).

"""
Use the SDK for Python to submit Spark jobs in DLA.

author aliyun
"""

def submit_spark_job(client: AcsClient, cluster_name, job_config):
    """
    Submit a Spark job and return the ID of the job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is executed.
    :param job_config:         The JSON string that is used to define the Spark job.
    :return:                   The ID of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request content.
    request = SubmitSparkJobRequest.SubmitSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_ConfigJson(job_config)

    # Submit the job and obtain the result.
    response = client.do_action_with_exception(request)

    # Return the ID of the job.
    r = json.loads(str(response, encoding='utf-8'))
    return r['JobId']


def get_job_status(client: AcsClient, cluster_name, job_id):
    """
    Query the execution status of a Spark job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is executed.
    :param job_id:             The ID of the Spark job.
    :return:                   The status of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request content.
    request = GetJobStatusRequest.GetJobStatusRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # Obtain the running status of the job.
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    return r['Status']


def get_job_log(client: AcsClient, cluster_name, job_id):
    """
    Obtain the logs of a Spark job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is executed.
    :param job_id:             The ID of the Spark job.
    :return:                   The log of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request content.
    request = GetJobLogRequest.GetJobLogRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # Obtain the logs of the job.
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    return r['Data']


def kill_job(client: AcsClient, cluster_name, job_id):
    """
    Kill a running Spark job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is executed.
    :param job_id:             The ID of the Spark job.
    :return:                   None
    :exception                 ClientException
    """

    # Initialize the request content.
    request = KillSparkJobRequest.KillSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # Kill the running job.
    client.do_action_with_exception(request)


def list_job(client: AcsClient, cluster_name: str, page_number: int,
             page_size: int):
    """
    Return the details of historical jobs in a Spark VC.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where jobs are executed.
    :param page_number:        The page number of historical jobs. The page number starts from 1.
    :param page_size           The number of jobs listed on each page.
    :return:                   None
    :exception                 ClientException
    """

    # Initialize the request body.
    request = ListSparkJobRequest.ListSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_PageNumber(page_number)
    request.set_PageSize(page_size)

    # Return the details of the jobs.
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    for job_item in r["DataResult"]["JobList"]:
        job_detail = 'JobId:{}, JobStatus:{}, ' \
                     'JobUI:{}, SumbitTime:{} \n'.format(job_item['JobId'], job_item['Status'],
                                                         job_item['SparkUI'],
                                                         job_item['SubmitTime'])
        print(job_detail)


if __name__ == '__main__':
    # Required parameters
    region = "cn-hangzhou"
    access_key_id = "xxx"
    access_key_secret = "yyy"
    cluster_name = "SparkCluster"

    # Submit a job for calculating π.
    job_config = '''
     {
         "name": "SparkPi",
         "file": "local:///tmp/spark-examples.jar",
         "className": "org.apache.spark.examples.SparkPi",
         "args": [
            "1000000"
         ],
         "conf": {
             "spark.driver.resourceSpec": "small",
             "spark.executor.instances": 1,
             "spark.executor.resourceSpec": "small"
             "spark.dla.job.log.oss.uri": "oss://test/spark-logs"
         }
    }
    '''
    # Create a client.
    client = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region)

    # Submit the task.
    job_id: str = submit_spark_job(client, cluster_name, job_config)

    # Poll the jobs to check the status of each job. If a job expires, kill the job.
    submit_time = time.time()
    while True:
        job_status: str = get_job_status(client, cluster_name, job_id)
        # The job is terminated.
        if job_status in ["error", "success", "dead", "killed"]:
            break
        # Kill the expired job.
        elif int(time.time() - submit_time) > 100:
            kill_job(client, cluster_name, job_id)
            break
        # Wait for the next scan.
        else:
            time.sleep(5)

    # Return the details of the job.
    print("log detail: {} \n".format(get_job_log(client, cluster_name, job_id)))

    # Query the latest 10 jobs.
    list_job(client, cluster_name, 1, 10)