AnalyticDB for MySQL Data Lakehouse Edition (V3.0) lets you manage Spark jobs programmatically using the SDK for Java. This guide covers how to submit a Spark job, poll its status, retrieve logs and details, list historical jobs, and terminate a running job.
Prerequisites
Before you begin, make sure you have:
-
JDK 1.8 or later installed
-
An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. See Create a Data Lakehouse Edition cluster
-
A job resource group for the cluster. See Create a resource group
-
A configured log storage path, using one of the following methods:
-
In the AnalyticDB for MySQL console, go to the Spark JAR Development page and click Log Settings in the upper-right corner
-
Set the
spark.app.log.rootPathparameter to an Object Storage Service (OSS) path
-
Add Maven dependencies
Add the following dependencies to your pom.xml file. Use version 1.0.16 of adb20211201 for stability.
<dependencies>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>adb20211201</artifactId>
<version>1.0.16</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
</dependencies>
Configure authentication
Store your AccessKey credentials in environment variables. Hardcoding credentials in source code risks exposing sensitive information.
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>
For instructions on setting environment variables on Linux, macOS, and Windows, see Configure environment variables.
Initialize the client by reading credentials from environment variables:
import com.aliyun.adb20211201.Client;
import com.aliyun.teaopenapi.models.Config;
Config config = new Config();
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Replace with the region ID where your cluster resides
config.setRegionId("cn-hangzhou");
// Replace with the endpoint for that region
config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
Client client = new Client(config);
SDK operations
The following sections show each operation as a standalone example. All operations follow the same pattern: build a request object, call the client method, and extract the result from the response body.
Submit a Spark job
Call submitSparkApp() with your cluster ID, resource group name, job configuration, and job type. The method returns the job ID (appId), which you use in all subsequent operations.
appType accepts two values:
| Value | Description |
|---|---|
Batch |
Spark batch job |
SQL |
Spark SQL job |
For Batch jobs, pass a JSON string with the job configuration. For SQL jobs, pass an SQL statement.
import com.aliyun.adb20211201.models.SubmitSparkAppRequest;
import com.aliyun.adb20211201.models.SubmitSparkAppResponse;
@SneakyThrows
public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
SubmitSparkAppRequest request = new SubmitSparkAppRequest();
request.setDBClusterId(clusterId);
request.setResourceGroupName(rgName);
request.setData(data);
request.setAppType(type);
SubmitSparkAppResponse response = client.submitSparkApp(request);
// Save the returned appId — you need it for all subsequent operations
return response.getBody().getData().getAppId();
}
The following example submits a SparkPi batch job:
String clusterId = "amv-bp1mhnosdb38****";
String resourceGroupName = "test";
String data = "{\n" +
" \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
" \"args\": [\"1000\"],\n" +
" \"file\": \"local:///tmp/spark-examples.jar\",\n" +
" \"name\": \"SparkPi\",\n" +
" \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
" \"conf\": {\n" +
" \"spark.driver.resourceSpec\": \"medium\",\n" +
" \"spark.executor.instances\": 2,\n" +
" \"spark.executor.resourceSpec\": \"medium\"}\n" +
"}\n";
String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);
System.out.println("Submitted job ID: " + appId);
Query job status
Call getSparkAppState() with the job ID to get the current status.
import com.aliyun.adb20211201.models.GetSparkAppStateRequest;
import com.aliyun.adb20211201.models.GetSparkAppStateResponse;
@SneakyThrows
public static String getAppState(String appId, Client client) {
GetSparkAppStateRequest request = new GetSparkAppStateRequest();
request.setAppId(appId);
GetSparkAppStateResponse response = client.getSparkAppState(request);
return response.getBody().getData().getState();
}
Job terminal states
The job reaches one of the following terminal states when it finishes:
| State | Description |
|---|---|
COMPLETED |
The job finished successfully |
FAILED |
The job failed |
FATAL |
The job encountered a fatal error |
To poll for completion, loop until the job reaches a terminal state (COMPLETED, FAILED, or FATAL):
long maxRunningTimeMs = 60000; // 60 seconds
long pollIntervalMs = 2000; // 2 seconds
String state;
long startTime = System.currentTimeMillis();
do {
state = getAppState(appId, client);
if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
System.out.println("Timed out waiting for job to complete.");
break;
}
System.out.println("Current state: " + state);
Thread.sleep(pollIntervalMs);
} while (!"COMPLETED".equalsIgnoreCase(state)
&& !"FATAL".equalsIgnoreCase(state)
&& !"FAILED".equalsIgnoreCase(state));
Query job details
Call getSparkAppInfo() to retrieve job metadata such as the Spark UI address and start/end timestamps.
import com.aliyun.adb20211201.models.GetSparkAppInfoRequest;
import com.aliyun.adb20211201.models.GetSparkAppInfoResponse;
import com.aliyun.adb20211201.models.SparkAppInfo;
@SneakyThrows
public static SparkAppInfo getAppInfo(String appId, Client client) {
GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
request.setAppId(appId);
GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
return response.getBody().getData();
}
Access specific fields from the returned SparkAppInfo object:
SparkAppInfo appInfo = getAppInfo(appId, client);
System.out.println("State: " + appInfo.getState());
System.out.println("Spark UI: " + appInfo.getDetail().webUiAddress);
System.out.println("Submitted at: " + appInfo.getDetail().submittedTimeInMillis);
System.out.println("Terminated at: " + appInfo.getDetail().terminatedTimeInMillis);
Retrieve job logs
Call getSparkAppLog() to get the driver log for a job.
import com.aliyun.adb20211201.models.GetSparkAppLogRequest;
import com.aliyun.adb20211201.models.GetSparkAppLogResponse;
@SneakyThrows
public static String getAppDriverLog(String appId, Client client) {
GetSparkAppLogRequest request = new GetSparkAppLogRequest();
request.setAppId(appId);
GetSparkAppLogResponse response = client.getSparkAppLog(request);
return response.getBody().getData().getLogContent();
}String log = getAppDriverLog(appId, client);
System.out.println(log);
List historical jobs
Call listSparkApps() to retrieve historical Spark jobs for a cluster. Results are paginated — page numbers start at 1.
import com.aliyun.adb20211201.models.ListSparkAppsRequest;
import com.aliyun.adb20211201.models.ListSparkAppsResponse;
import java.util.List;
@SneakyThrows
public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
ListSparkAppsRequest request = new ListSparkAppsRequest();
request.setDBClusterId(clusterId);
request.setPageNumber(pageNumber);
request.setPageSize(pageSize);
ListSparkAppsResponse response = client.listSparkApps(request);
return response.getBody().getData().getAppInfoList();
}// Retrieve the first 50 jobs for the cluster
List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
for (SparkAppInfo job : jobs) {
System.out.printf("AppId: %s | State: %s | Spark UI: %s%n",
job.getAppId(),
job.getState(),
job.getDetail().webUiAddress);
}
Terminate a job
Call killSparkApp() with the job ID to stop a running job.
import com.aliyun.adb20211201.models.KillSparkAppRequest;
KillSparkAppRequest request = new KillSparkAppRequest();
request.setAppId(appId);
client.killSparkApp(request);
System.out.println("Job terminated: " + appId);
Complete example
The following end-to-end example ties all operations together: submit a SparkPi job, wait for it to complete, retrieve its details and logs, list historical jobs, and terminate the job.
import com.aliyun.adb20211201.Client;
import com.aliyun.adb20211201.models.*;
import com.aliyun.teaopenapi.models.Config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.SneakyThrows;
import java.util.List;
public class SparkExample {
private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
@SneakyThrows
public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
SubmitSparkAppRequest request = new SubmitSparkAppRequest();
request.setDBClusterId(clusterId);
request.setResourceGroupName(rgName);
request.setData(data);
request.setAppType(type);
System.out.println("Submitting job: " + gson.toJson(request));
SubmitSparkAppResponse response = client.submitSparkApp(request);
System.out.println("Submit response: " + gson.toJson(response));
return response.getBody().getData().getAppId();
}
@SneakyThrows
public static String getAppState(String appId, Client client) {
GetSparkAppStateRequest request = new GetSparkAppStateRequest();
request.setAppId(appId);
GetSparkAppStateResponse response = client.getSparkAppState(request);
return response.getBody().getData().getState();
}
@SneakyThrows
public static SparkAppInfo getAppInfo(String appId, Client client) {
GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
request.setAppId(appId);
GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
return response.getBody().getData();
}
@SneakyThrows
public static String getAppDriverLog(String appId, Client client) {
GetSparkAppLogRequest request = new GetSparkAppLogRequest();
request.setAppId(appId);
GetSparkAppLogResponse response = client.getSparkAppLog(request);
return response.getBody().getData().getLogContent();
}
@SneakyThrows
public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
ListSparkAppsRequest request = new ListSparkAppsRequest();
request.setDBClusterId(clusterId);
request.setPageNumber(pageNumber);
request.setPageSize(pageSize);
ListSparkAppsResponse response = client.listSparkApps(request);
return response.getBody().getData().getAppInfoList();
}
public static void main(String[] args) throws Exception {
// Initialize the client using credentials from environment variables
Config config = new Config();
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
config.setRegionId("cn-hangzhou");
config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
Client client = new Client(config);
String clusterId = "amv-bp1mhnosdb38****";
String resourceGroupName = "test";
String data = "{\n" +
" \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
" \"args\": [\"1000\"],\n" +
" \"file\": \"local:///tmp/spark-examples.jar\",\n" +
" \"name\": \"SparkPi\",\n" +
" \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
" \"conf\": {\n" +
" \"spark.driver.resourceSpec\": \"medium\",\n" +
" \"spark.executor.instances\": 2,\n" +
" \"spark.executor.resourceSpec\": \"medium\"}\n" +
"}\n";
// Step 1: Submit the job
String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);
// Step 2: Poll until the job reaches a terminal state
long maxRunningTimeMs = 60000;
long pollIntervalMs = 2000;
String state;
long startTime = System.currentTimeMillis();
do {
state = getAppState(appId, client);
if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
System.out.println("Timed out.");
break;
}
System.out.println("Current state: " + state);
Thread.sleep(pollIntervalMs);
} while (!"COMPLETED".equalsIgnoreCase(state)
&& !"FATAL".equalsIgnoreCase(state)
&& !"FAILED".equalsIgnoreCase(state));
// Step 3: Retrieve job details
SparkAppInfo appInfo = getAppInfo(appId, client);
System.out.printf("State: %s | Spark UI: %s | Submitted: %s | Terminated: %s%n",
state,
appInfo.getDetail().webUiAddress,
appInfo.getDetail().submittedTimeInMillis,
appInfo.getDetail().terminatedTimeInMillis);
// Step 4: Retrieve driver logs
String log = getAppDriverLog(appId, client);
System.out.println(log);
// Step 5: List historical jobs
List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
jobs.forEach(job -> System.out.printf("AppId: %s | State: %s | Spark UI: %s%n",
job.getAppId(),
job.getState(),
job.getDetail().webUiAddress));
// Step 6: Terminate the job
KillSparkAppRequest killRequest = new KillSparkAppRequest();
killRequest.setAppId(appId);
client.killSparkApp(killRequest);
}
}