This topic describes how to use the SDK for Python to submit a π computing task, view the task status and logs, terminate an expired task, and view historical jobs of a virtual cluster (VC) in Data Lake Analytics (DLA).

"""
Use the SDK for Python to submit Spark jobs in DLA.

author aliyun
"""
from aliyunsdkcore.client import AcsClient
from aliyunsdkopenanalytics_open.request.v20180619 import SubmitSparkJobRequest, GetJobStatusRequest, GetJobLogRequest, \
    KillSparkJobRequest, ListSparkJobRequest, SubmitSparkSQLRequest
import json
import time


def submit_spark_sql(client: AcsClient, cluster_name, sql):
    """
    Submit a Spark job and return the ID of the job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is run.
    :param sql:                The SQL content.
    :return:                   The ID of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request content.
    request = SubmitSparkSQLRequest.SubmitSparkSQLRequest()
    request.set_VcName(cluster_name)
    request.set_Sql(sql)

    # Submit the job and obtain the result.
    response = client.do_action_with_exception(request)

    # Return the job ID.
    r = json.loads(str(response, encoding='utf-8'))
    return r['JobId']


def submit_spark_job(client: AcsClient, cluster_name, job_config):
    """
    Submit a Spark job and return the ID of the job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is run.
    :param job_config:         The JSON string that defines the Spark job.
    :return:                   The ID of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request content.
    request = SubmitSparkJobRequest.SubmitSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_ConfigJson(job_config)

    # Submit the job and obtain the result.
    response = client.do_action_with_exception(request)

    # Return the job ID.
    r = json.loads(str(response, encoding='utf-8'))
    return r['JobId']


def get_job_status(client: AcsClient, cluster_name, job_id):
    """
    Query the status of a Spark job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is run.
    :param job_id:             The ID of the Spark job.
    :return:                   The status of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request content.
    request = GetJobStatusRequest.GetJobStatusRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # Obtain the status of the job.
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    return r['Status']


def get_job_log(client: AcsClient, cluster_name, job_id):
    """
    Obtain the log of a Spark job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is run.
    :param job_id:             The ID of the Spark job.
    :return:                   The log of the Spark job.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Initialize the request content.
    request = GetJobLogRequest.GetJobLogRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # Obtain the log of the job.
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    return r['Data']


def kill_job(client: AcsClient, cluster_name, job_id):
    """
    Terminate a running Spark job.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is run.
    :param job_id:             The ID of the Spark job.
    :return:                   None
    :exception                 ClientException
    """

    # Initialize the request content.
    request = KillSparkJobRequest.KillSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # Terminate the running job.
    client.do_action_with_exception(request)


def list_job(client: AcsClient, cluster_name: str, page_number: int,
             page_size: int):
    """
    Return the details of historical jobs in a Spark VC.

    :param client:             The Alibaba Cloud client.
    :param cluster_name:       The name of the Spark VC where the job is run.
    :param page_number:        The page number of the returned page. Pages start from page 1.
    :param page_size           The number of entries returned per page.
    :return:                   None
    :exception                 ClientException
    """

    # Initialize the request body.
    request = ListSparkJobRequest.ListSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_PageNumber(page_number)
    request.set_PageSize(page_size)

    # Return the details of the jobs.
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    for job_item in r["DataResult"]["JobList"]:
        job_detail = 'JobId:{}, JobStatus:{}, ' \
                     'JobUI:{}, SumbitTime:{} \n'.format(job_item['JobId'], job_item['Status'],
                                                         job_item['SparkUI'],
                                                         job_item['SubmitTime'])
        print(job_detail)


if __name__ == '__main__':
    # Required parameters
    region = "cn-hangzhou"
    access_key_id = "xxx"
    access_key_secret = "yyy"
    cluster_name = "SparkCluster"

    # Submit a job for calculating π.
    job_config = '''
     {
         "name": "SparkPi",
         "file": "local:///tmp/spark-examples.jar",
         "className": "org.apache.spark.examples.SparkPi",
         "args": [
            "1000000"
         ],
         "conf": {
             "spark.driver.resourceSpec": "small",
             "spark.executor.instances": 1,
             "spark.executor.resourceSpec": "small"
             "spark.dla.job.log.oss.uri": "oss://test/spark-logs"
         }
    }
    '''

    # Create a SHOW DATABASES statement.
    sql = '''
    -- here is the spark conf
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=5;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=sparksqltest;
    set spark.sql.hive.metastore.version=dla;
    -- here is your sql statement
    -- add your jar
    -- add jar oss://path/to/your/jar
    show databases;
    '''

    # Create a client.
    client = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region)

    # Submit an SQL statement.
    #job_id: str = submit_spark_sql(client, cluster_name, sql)

    # Submit a job.
    job_id: str = submit_spark_job(client, cluster_name, job_config)

    # Poll the jobs to check the status of each job. If a job expires, terminate the job.
    submit_time = time.time()
    while True:
        job_status: str = get_job_status(client, cluster_name, job_id)
        # The job is terminated.
        if job_status in ["error", "success", "dead", "killed"]:
            break
        # Terminate the expired job.
        elif int(time.time() - submit_time) > 100:
            kill_job(client, cluster_name, job_id)
            break
        # Wait for the next scan.
        else:
            time.sleep(5)

    # Return the details of the job.
    print("log detail: {} \n".format(get_job_log(client, cluster_name, job_id)))

    # Query the last 10 jobs.
    list_job(client, cluster_name, 1, 10)