This topic uses a complete program to describe how to submit a job for calculating π to DLA, monitor the job status, and query the execution results of historical jobs.
package com.aliyun.dla;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.openanalytics_open.model.v20180619.*;
import com.aliyuncs.profile.DefaultProfile;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
Use the SDK for Java to perform operations on a Spark job of DLA.
*
* @author aliyun
*/
public class Demo {
/**
* Submit an SQL job to the serverless Spark engine of DLA.
*
* @param virtualClusterName The name of the virtual cluster (VC) in DLA.
* @param sql The SQL content.
* @return Spark JobId, The ID of the job that is returned after the job is submitted. The ID is used to monitor the status of the job.
* @throws ClientException An exception is returned due to issues, such as network errors.
*/
public static String submitSparkSQL(IAcsClient client,
String virtualClusterName,
String sql) throws ClientException {
// Initialize the request and specify the VC name and job content.
SubmitSparkSQLRequest request = new SubmitSparkSQLRequest();
request.setVcName(virtualClusterName);
request.setSql(sql);
// Submit the Spark job and return the ID of the job.
SubmitSparkSQLResponse response = client.getAcsResponse(request);
return response.getJobId();
}
/**
* Submit a job to the serverless Spark engine of DLA.
*
* @param virtualClusterName The name of the VC in DLA.
* @param jobConfig The JSON file that describes the Spark job that you want to submit.
* @return Spark JobId, The ID of the job that is returned after the job is submitted. The ID is used to monitor the status of the job.
* @throws ClientException An exception is returned due to issues, such as network errors.
*/
public static String submitSparkJob(IAcsClient client,
String virtualClusterName,
String jobConfig) throws ClientException {
// Initialize the request and specify the VC name and job content.
SubmitSparkJobRequest request = new SubmitSparkJobRequest();
request.setVcName(virtualClusterName);
request.setConfigJson(jobConfig);
// Submit the Spark job and return the ID of the job.
SubmitSparkJobResponse response = client.getAcsResponse(request);
return response.getJobId();
}
/**
* Return the status of a Spark job.
*
* @param sparkJobId The ID of the Spark job.
* @return The status of the Spark job, which is displayed as a string.
* @throws ClientException An exception is returned due to issues, such as network errors.
*/
public static String getSparkJobStatus(IAcsClient client,
String virtualClusterName,
String sparkJobId) throws ClientException {
// Initialize the request and specify the ID of the Spark job.
GetJobStatusRequest request = new GetJobStatusRequest();
request.setJobId(sparkJobId);
request.setVcName(virtualClusterName);
// Submit the job and return the status code of the Spark job.
GetJobStatusResponse response = client.getAcsResponse(request);
return response.getStatus();
}
/**
* Stop a Spark job.
*
* @param sparkJobId The ID of the Spark job.
* @param virtualClusterName The name of the VC in DLA.
* @return No value is returned.
* @throws ClientException An exception is returned due to issues, such as network errors.
*/
public static void killSparkJob(IAcsClient client,
String virtualClusterName,
String sparkJobId) throws ClientException {
// Initialize the request and specify the ID of the Spark job.
KillSparkJobRequest request = new KillSparkJobRequest();
request.setVcName(virtualClusterName);
request.setJobId(sparkJobId);
// Submit a Spark job and return the status code of the job.
KillSparkJobResponse response = client.getAcsResponse(request);
}
/**
* Return the logs of a Spark job.
*
* @param client The client.
* @param virtualClusterName The name of the VC in DLA.
* @param sparkJobId The ID of the Spark job.
* @return The status of the Spark job, which is displayed as a string.
* @throws ClientException An exception is returned due to issues, such as network errors.
*/
public static String getSparkJobLog(IAcsClient client,
String virtualClusterName,
String sparkJobId) throws ClientException {
// Initialize the request and specify the ID of the Spark job.
GetJobLogRequest request = new GetJobLogRequest();
request.setJobId(sparkJobId);
request.setVcName(virtualClusterName);
// Submit the job and return the logs of the Spark job.
GetJobLogResponse response = client.getAcsResponse(request);
return response.getData();
}
/**
* Query the Spark jobs that are submitted to a VC. You can query all historical jobs displayed on pages.
*
* @param client The client.
* @param pageNumber The number of the page to return. Pages start from page 1.
* @param pageSize The number of entries to return on each page.
* @throws ClientException An exception is returned due to issues, such as network errors.
*/
public static void listSparkJob(IAcsClient client,
String virtualClusterName,
int pageNumber,
int pageSize) throws ClientException {
// Initialize the request and specify the ID of the Spark job.
ListSparkJobRequest request = new ListSparkJobRequest();
request.setVcName(virtualClusterName);
request.setPageNumber(pageNumber); // Pages start from page 1.
request.setPageSize(pageSize);
// Submit a Spark job and return the status code of the job.
ListSparkJobResponse response = client.getAcsResponse(request);
// Obtain the job list.
List<ListSparkJobResponse.DataResult.Data> sparkJobList = response.getDataResult().getJobList();
for (ListSparkJobResponse.DataResult.Data job : sparkJobList) {
System.out.println(String.format("JobName: %s, JobUI: %s, JobStatus: %s, JobConfig: %s",
job.getJobName(),
job.getStatus(),
job.getSparkUI(),
job.getDetail()));
}
}
public static void main(String[] args) throws IOException, ClientException, InterruptedException {
// Parameters that are required for submitting the Spark job
String region = "cn-hangzhou";
String accessKeyId = "xxx";
String accessKeySecret = "yyy";
String virtualClusterName = "MyCluster";
// A valid JSON string.
String jobConfig=
"{\n" +
" \"name\": \"SparkPi\",\n" +
" \"file\": \"local:///tmp/spark-examples.jar\",\n" +
" \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
" \"args\": [\n" +
" \"100\"\n" +
" ],\n" +
" \"conf\": {\n" +
" \"spark.driver.resourceSpec\": \"medium\",\n" +
" \"spark.executor.instances\": 5,\n" +
" \"spark.executor.resourceSpec\": \"medium\"\n" +
" }\n" +
"}";
String sql = "-- here is the spark conf\n"
+ "set spark.driver.resourceSpec=medium;\n"
+ "set spark.executor.instances=5;\n"
+ "set spark.executor.resourceSpec=medium;\n"
+ "set spark.app.name=sparksqltest;\n"
+ "set spark.sql.hive.metastore.version=dla;\n"
+ "-- here is your sql statement\n"
+ "-- add your jar\n"
+ "-- add jar oss://path/to/your/jar\n"
+ "show databases;";
// Initialize the AcsClient.
DefaultProfile profile = DefaultProfile.getProfile(region, accessKeyId, accessKeySecret);
IAcsClient client = new DefaultAcsClient(profile);
// Submit a Spark job.
// You can also choose to submit an SQL job.
//String sparkJobId = submitSparkSQL(client, virtualClusterName, sql);
String sparkJobId = submitSparkJob(client, virtualClusterName, jobConfig);
// Poll the jobs to check the job status. If a job expires, kill the job.
long startTime = System.currentTimeMillis();
List<String> finalStatusList = Arrays.asList("error", "success", "dead", "killed");
while (true) {
String status = getSparkJobStatus(client, virtualClusterName, sparkJobId);
if (finalStatusList.contains(status)) {
System.out.println("Job went to final status");
break;
} else if ((System.currentTimeMillis() - startTime) > 100000) {
// Kill the expired job.
System.out.println("Kill expire time job");
killSparkJob(client, virtualClusterName, sparkJobId);
break;
}
// Return the job status. Wait for 5 seconds to start the next query.
System.out.println(String.format("Job %s status is %s", sparkJobId, status));
Thread.sleep(5000);
}
// Return the log of the job.
String logDetail = getSparkJobLog(client, virtualClusterName, sparkJobId);
System.out.println(logDetail);
// Return the details of the last 10 jobs.
listSparkJob(client, virtualClusterName, 1, 10);
}
}