This topic describes how to use the SDK for Python to submit a π computing task, view the task status and logs, terminate an expired task, and view historical jobs of a virtual cluster (VC) in Data Lake Analytics (DLA).
"""
Use the SDK for Python to submit Spark jobs in DLA.
author aliyun
"""
from aliyunsdkcore.client import AcsClient
from aliyunsdkopenanalytics_open.request.v20180619 import SubmitSparkJobRequest, GetJobStatusRequest, GetJobLogRequest, \
KillSparkJobRequest, ListSparkJobRequest, SubmitSparkSQLRequest
import json
import time
def submit_spark_sql(client: AcsClient, cluster_name, sql):
"""
Submit a Spark job and return the ID of the job.
:param client: The Alibaba Cloud client.
:param cluster_name: The name of the Spark VC where the job is run.
:param sql: The SQL content.
:return: The ID of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request content.
request = SubmitSparkSQLRequest.SubmitSparkSQLRequest()
request.set_VcName(cluster_name)
request.set_Sql(sql)
# Submit the job and obtain the result.
response = client.do_action_with_exception(request)
# Return the job ID.
r = json.loads(str(response, encoding='utf-8'))
return r['JobId']
def submit_spark_job(client: AcsClient, cluster_name, job_config):
"""
Submit a Spark job and return the ID of the job.
:param client: The Alibaba Cloud client.
:param cluster_name: The name of the Spark VC where the job is run.
:param job_config: The JSON string that defines the Spark job.
:return: The ID of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request content.
request = SubmitSparkJobRequest.SubmitSparkJobRequest()
request.set_VcName(cluster_name)
request.set_ConfigJson(job_config)
# Submit the job and obtain the result.
response = client.do_action_with_exception(request)
# Return the job ID.
r = json.loads(str(response, encoding='utf-8'))
return r['JobId']
def get_job_status(client: AcsClient, cluster_name, job_id):
"""
Query the status of a Spark job.
:param client: The Alibaba Cloud client.
:param cluster_name: The name of the Spark VC where the job is run.
:param job_id: The ID of the Spark job.
:return: The status of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request content.
request = GetJobStatusRequest.GetJobStatusRequest()
request.set_VcName(cluster_name)
request.set_JobId(job_id)
# Obtain the status of the job.
response = client.do_action_with_exception(request)
r = json.loads(str(response, encoding='utf-8'))
return r['Status']
def get_job_log(client: AcsClient, cluster_name, job_id):
"""
Obtain the log of a Spark job.
:param client: The Alibaba Cloud client.
:param cluster_name: The name of the Spark VC where the job is run.
:param job_id: The ID of the Spark job.
:return: The log of the Spark job.
:rtype: basestring
:exception ClientException
"""
# Initialize the request content.
request = GetJobLogRequest.GetJobLogRequest()
request.set_VcName(cluster_name)
request.set_JobId(job_id)
# Obtain the log of the job.
response = client.do_action_with_exception(request)
r = json.loads(str(response, encoding='utf-8'))
return r['Data']
def kill_job(client: AcsClient, cluster_name, job_id):
"""
Terminate a running Spark job.
:param client: The Alibaba Cloud client.
:param cluster_name: The name of the Spark VC where the job is run.
:param job_id: The ID of the Spark job.
:return: None
:exception ClientException
"""
# Initialize the request content.
request = KillSparkJobRequest.KillSparkJobRequest()
request.set_VcName(cluster_name)
request.set_JobId(job_id)
# Terminate the running job.
client.do_action_with_exception(request)
def list_job(client: AcsClient, cluster_name: str, page_number: int,
page_size: int):
"""
Return the details of historical jobs in a Spark VC.
:param client: The Alibaba Cloud client.
:param cluster_name: The name of the Spark VC where the job is run.
:param page_number: The page number of the returned page. Pages start from page 1.
:param page_size The number of entries returned per page.
:return: None
:exception ClientException
"""
# Initialize the request body.
request = ListSparkJobRequest.ListSparkJobRequest()
request.set_VcName(cluster_name)
request.set_PageNumber(page_number)
request.set_PageSize(page_size)
# Return the details of the jobs.
response = client.do_action_with_exception(request)
r = json.loads(str(response, encoding='utf-8'))
for job_item in r["DataResult"]["JobList"]:
job_detail = 'JobId:{}, JobStatus:{}, ' \
'JobUI:{}, SumbitTime:{} \n'.format(job_item['JobId'], job_item['Status'],
job_item['SparkUI'],
job_item['SubmitTime'])
print(job_detail)
if __name__ == '__main__':
# Required parameters
region = "cn-hangzhou"
access_key_id = "xxx"
access_key_secret = "yyy"
cluster_name = "SparkCluster"
# Submit a job for calculating π.
job_config = '''
{
"name": "SparkPi",
"file": "local:///tmp/spark-examples.jar",
"className": "org.apache.spark.examples.SparkPi",
"args": [
"1000000"
],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 1,
"spark.executor.resourceSpec": "small"
"spark.dla.job.log.oss.uri": "oss://test/spark-logs"
}
}
'''
# Create a SHOW DATABASES statement.
sql = '''
-- here is the spark conf
set spark.driver.resourceSpec=medium;
set spark.executor.instances=5;
set spark.executor.resourceSpec=medium;
set spark.app.name=sparksqltest;
set spark.sql.hive.metastore.version=dla;
-- here is your sql statement
-- add your jar
-- add jar oss://path/to/your/jar
show databases;
'''
# Create a client.
client = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region)
# Submit an SQL statement.
#job_id: str = submit_spark_sql(client, cluster_name, sql)
# Submit a job.
job_id: str = submit_spark_job(client, cluster_name, job_config)
# Poll the jobs to check the status of each job. If a job expires, terminate the job.
submit_time = time.time()
while True:
job_status: str = get_job_status(client, cluster_name, job_id)
# The job is terminated.
if job_status in ["error", "success", "dead", "killed"]:
break
# Terminate the expired job.
elif int(time.time() - submit_time) > 100:
kill_job(client, cluster_name, job_id)
break
# Wait for the next scan.
else:
time.sleep(5)
# Return the details of the job.
print("log detail: {} \n".format(get_job_log(client, cluster_name, job_id)))
# Query the last 10 jobs.
list_job(client, cluster_name, 1, 10)