All Products
Search
Document Center

AnalyticDB:Develop Spark applications with the Java SDK

Last Updated:Mar 30, 2026

AnalyticDB for MySQL Data Lakehouse Edition (V3.0) lets you manage Spark jobs programmatically using the SDK for Java. This guide covers how to submit a Spark job, poll its status, retrieve logs and details, list historical jobs, and terminate a running job.

Prerequisites

Before you begin, make sure you have:

  • JDK 1.8 or later installed

  • An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. See Create a Data Lakehouse Edition cluster

  • A job resource group for the cluster. See Create a resource group

  • A configured log storage path, using one of the following methods:

    • In the AnalyticDB for MySQL console, go to the Spark JAR Development page and click Log Settings in the upper-right corner

    • Set the spark.app.log.rootPath parameter to an Object Storage Service (OSS) path

Add Maven dependencies

Add the following dependencies to your pom.xml file. Use version 1.0.16 of adb20211201 for stability.

<dependencies>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>adb20211201</artifactId>
        <version>1.0.16</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
    </dependency>
</dependencies>

Configure authentication

Store your AccessKey credentials in environment variables. Hardcoding credentials in source code risks exposing sensitive information.

export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>

For instructions on setting environment variables on Linux, macOS, and Windows, see Configure environment variables.

Initialize the client by reading credentials from environment variables:

import com.aliyun.adb20211201.Client;
import com.aliyun.teaopenapi.models.Config;

Config config = new Config();
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Replace with the region ID where your cluster resides
config.setRegionId("cn-hangzhou");
// Replace with the endpoint for that region
config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");

Client client = new Client(config);

SDK operations

The following sections show each operation as a standalone example. All operations follow the same pattern: build a request object, call the client method, and extract the result from the response body.

Submit a Spark job

Call submitSparkApp() with your cluster ID, resource group name, job configuration, and job type. The method returns the job ID (appId), which you use in all subsequent operations.

appType accepts two values:

Value Description
Batch Spark batch job
SQL Spark SQL job

For Batch jobs, pass a JSON string with the job configuration. For SQL jobs, pass an SQL statement.

import com.aliyun.adb20211201.models.SubmitSparkAppRequest;
import com.aliyun.adb20211201.models.SubmitSparkAppResponse;

@SneakyThrows
public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
    SubmitSparkAppRequest request = new SubmitSparkAppRequest();
    request.setDBClusterId(clusterId);
    request.setResourceGroupName(rgName);
    request.setData(data);
    request.setAppType(type);

    SubmitSparkAppResponse response = client.submitSparkApp(request);
    // Save the returned appId — you need it for all subsequent operations
    return response.getBody().getData().getAppId();
}

The following example submits a SparkPi batch job:

String clusterId = "amv-bp1mhnosdb38****";
String resourceGroupName = "test";

String data = "{\n" +
        "    \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
        "    \"args\": [\"1000\"],\n" +
        "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
        "    \"name\": \"SparkPi\",\n" +
        "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
        "    \"conf\": {\n" +
        "        \"spark.driver.resourceSpec\": \"medium\",\n" +
        "        \"spark.executor.instances\": 2,\n" +
        "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
        "}\n";

String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);
System.out.println("Submitted job ID: " + appId);

Query job status

Call getSparkAppState() with the job ID to get the current status.

import com.aliyun.adb20211201.models.GetSparkAppStateRequest;
import com.aliyun.adb20211201.models.GetSparkAppStateResponse;

@SneakyThrows
public static String getAppState(String appId, Client client) {
    GetSparkAppStateRequest request = new GetSparkAppStateRequest();
    request.setAppId(appId);

    GetSparkAppStateResponse response = client.getSparkAppState(request);
    return response.getBody().getData().getState();
}

Job terminal states

The job reaches one of the following terminal states when it finishes:

State Description
COMPLETED The job finished successfully
FAILED The job failed
FATAL The job encountered a fatal error

To poll for completion, loop until the job reaches a terminal state (COMPLETED, FAILED, or FATAL):

long maxRunningTimeMs = 60000;  // 60 seconds
long pollIntervalMs = 2000;     // 2 seconds

String state;
long startTime = System.currentTimeMillis();
do {
    state = getAppState(appId, client);
    if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
        System.out.println("Timed out waiting for job to complete.");
        break;
    }
    System.out.println("Current state: " + state);
    Thread.sleep(pollIntervalMs);
} while (!"COMPLETED".equalsIgnoreCase(state)
        && !"FATAL".equalsIgnoreCase(state)
        && !"FAILED".equalsIgnoreCase(state));

Query job details

Call getSparkAppInfo() to retrieve job metadata such as the Spark UI address and start/end timestamps.

import com.aliyun.adb20211201.models.GetSparkAppInfoRequest;
import com.aliyun.adb20211201.models.GetSparkAppInfoResponse;
import com.aliyun.adb20211201.models.SparkAppInfo;

@SneakyThrows
public static SparkAppInfo getAppInfo(String appId, Client client) {
    GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
    request.setAppId(appId);

    GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
    return response.getBody().getData();
}

Access specific fields from the returned SparkAppInfo object:

SparkAppInfo appInfo = getAppInfo(appId, client);
System.out.println("State:          " + appInfo.getState());
System.out.println("Spark UI:       " + appInfo.getDetail().webUiAddress);
System.out.println("Submitted at:   " + appInfo.getDetail().submittedTimeInMillis);
System.out.println("Terminated at:  " + appInfo.getDetail().terminatedTimeInMillis);

Retrieve job logs

Call getSparkAppLog() to get the driver log for a job.

import com.aliyun.adb20211201.models.GetSparkAppLogRequest;
import com.aliyun.adb20211201.models.GetSparkAppLogResponse;

@SneakyThrows
public static String getAppDriverLog(String appId, Client client) {
    GetSparkAppLogRequest request = new GetSparkAppLogRequest();
    request.setAppId(appId);

    GetSparkAppLogResponse response = client.getSparkAppLog(request);
    return response.getBody().getData().getLogContent();
}
String log = getAppDriverLog(appId, client);
System.out.println(log);

List historical jobs

Call listSparkApps() to retrieve historical Spark jobs for a cluster. Results are paginated — page numbers start at 1.

import com.aliyun.adb20211201.models.ListSparkAppsRequest;
import com.aliyun.adb20211201.models.ListSparkAppsResponse;
import java.util.List;

@SneakyThrows
public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
    ListSparkAppsRequest request = new ListSparkAppsRequest();
    request.setDBClusterId(clusterId);
    request.setPageNumber(pageNumber);
    request.setPageSize(pageSize);

    ListSparkAppsResponse response = client.listSparkApps(request);
    return response.getBody().getData().getAppInfoList();
}
// Retrieve the first 50 jobs for the cluster
List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
for (SparkAppInfo job : jobs) {
    System.out.printf("AppId: %s | State: %s | Spark UI: %s%n",
            job.getAppId(),
            job.getState(),
            job.getDetail().webUiAddress);
}

Terminate a job

Call killSparkApp() with the job ID to stop a running job.

import com.aliyun.adb20211201.models.KillSparkAppRequest;

KillSparkAppRequest request = new KillSparkAppRequest();
request.setAppId(appId);
client.killSparkApp(request);
System.out.println("Job terminated: " + appId);

Complete example

The following end-to-end example ties all operations together: submit a SparkPi job, wait for it to complete, retrieve its details and logs, list historical jobs, and terminate the job.

import com.aliyun.adb20211201.Client;
import com.aliyun.adb20211201.models.*;
import com.aliyun.teaopenapi.models.Config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.SneakyThrows;

import java.util.List;

public class SparkExample {
    private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();

    @SneakyThrows
    public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
        SubmitSparkAppRequest request = new SubmitSparkAppRequest();
        request.setDBClusterId(clusterId);
        request.setResourceGroupName(rgName);
        request.setData(data);
        request.setAppType(type);
        System.out.println("Submitting job: " + gson.toJson(request));
        SubmitSparkAppResponse response = client.submitSparkApp(request);
        System.out.println("Submit response: " + gson.toJson(response));
        return response.getBody().getData().getAppId();
    }

    @SneakyThrows
    public static String getAppState(String appId, Client client) {
        GetSparkAppStateRequest request = new GetSparkAppStateRequest();
        request.setAppId(appId);
        GetSparkAppStateResponse response = client.getSparkAppState(request);
        return response.getBody().getData().getState();
    }

    @SneakyThrows
    public static SparkAppInfo getAppInfo(String appId, Client client) {
        GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
        request.setAppId(appId);
        GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
        return response.getBody().getData();
    }

    @SneakyThrows
    public static String getAppDriverLog(String appId, Client client) {
        GetSparkAppLogRequest request = new GetSparkAppLogRequest();
        request.setAppId(appId);
        GetSparkAppLogResponse response = client.getSparkAppLog(request);
        return response.getBody().getData().getLogContent();
    }

    @SneakyThrows
    public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
        ListSparkAppsRequest request = new ListSparkAppsRequest();
        request.setDBClusterId(clusterId);
        request.setPageNumber(pageNumber);
        request.setPageSize(pageSize);
        ListSparkAppsResponse response = client.listSparkApps(request);
        return response.getBody().getData().getAppInfoList();
    }

    public static void main(String[] args) throws Exception {
        // Initialize the client using credentials from environment variables
        Config config = new Config();
        config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.setRegionId("cn-hangzhou");
        config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
        Client client = new Client(config);

        String clusterId = "amv-bp1mhnosdb38****";
        String resourceGroupName = "test";
        String data = "{\n" +
                "    \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
                "    \"args\": [\"1000\"],\n" +
                "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
                "    \"name\": \"SparkPi\",\n" +
                "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
                "    \"conf\": {\n" +
                "        \"spark.driver.resourceSpec\": \"medium\",\n" +
                "        \"spark.executor.instances\": 2,\n" +
                "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
                "}\n";

        // Step 1: Submit the job
        String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);

        // Step 2: Poll until the job reaches a terminal state
        long maxRunningTimeMs = 60000;
        long pollIntervalMs = 2000;
        String state;
        long startTime = System.currentTimeMillis();
        do {
            state = getAppState(appId, client);
            if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
                System.out.println("Timed out.");
                break;
            }
            System.out.println("Current state: " + state);
            Thread.sleep(pollIntervalMs);
        } while (!"COMPLETED".equalsIgnoreCase(state)
                && !"FATAL".equalsIgnoreCase(state)
                && !"FAILED".equalsIgnoreCase(state));

        // Step 3: Retrieve job details
        SparkAppInfo appInfo = getAppInfo(appId, client);
        System.out.printf("State: %s | Spark UI: %s | Submitted: %s | Terminated: %s%n",
                state,
                appInfo.getDetail().webUiAddress,
                appInfo.getDetail().submittedTimeInMillis,
                appInfo.getDetail().terminatedTimeInMillis);

        // Step 4: Retrieve driver logs
        String log = getAppDriverLog(appId, client);
        System.out.println(log);

        // Step 5: List historical jobs
        List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
        jobs.forEach(job -> System.out.printf("AppId: %s | State: %s | Spark UI: %s%n",
                job.getAppId(),
                job.getState(),
                job.getDetail().webUiAddress));

        // Step 6: Terminate the job
        KillSparkAppRequest killRequest = new KillSparkAppRequest();
        killRequest.setAppId(appId);
        client.killSparkApp(killRequest);
    }
}

What's next