All Products
Search
Document Center

AnalyticDB:Mengembangkan\ aplikasi\ Spark\ dengan\ Java\ SDK

Last Updated:Mar 29, 2026

AnalyticDB for MySQL Data Lakehouse Edition (V3.0) memungkinkan Anda mengelola pekerjaan Spark secara terprogram menggunakan SDK untuk Java. Panduan ini mencakup cara mengirim pekerjaan Spark, memantau statusnya, mengambil log dan detailnya, menampilkan daftar pekerjaan historis, serta menghentikan pekerjaan yang sedang berjalan.

Prasyarat

Sebelum memulai, pastikan Anda telah:

  • Menginstal JDK 1.8 atau versi yang lebih baru.

  • Memiliki kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0). Lihat Buat kluster Data Lakehouse Edition.

  • Memiliki kelompok sumber daya pekerjaan untuk kluster tersebut. Lihat Buat kelompok sumber daya.

  • Menyiapkan jalur penyimpanan log yang telah dikonfigurasi, menggunakan salah satu metode berikut:

    • Di Konsol AnalyticDB for MySQL, buka halaman Spark JAR Development, lalu klik Log Settings di pojok kanan atas.

    • Atur parameter spark.app.log.rootPath ke jalur Object Storage Service (OSS).

Tambahkan dependensi Maven

Tambahkan dependensi berikut ke file pom.xml Anda. Gunakan versi 1.0.16 dari adb20211201 untuk stabilitas.

<dependencies>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>adb20211201</artifactId>
        <version>1.0.16</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
    </dependency>
</dependencies>

Konfigurasikan otentikasi

Simpan kredensial AccessKey Anda dalam variabel lingkungan. Menyematkan kredensial langsung di kode sumber berisiko membocorkan informasi sensitif.

export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>

Untuk petunjuk pengaturan variabel lingkungan di Linux, macOS, dan Windows, lihat Konfigurasikan variabel lingkungan.

Inisialisasi klien dengan membaca kredensial dari variabel lingkungan:

import com.aliyun.adb20211201.Client;
import com.aliyun.teaopenapi.models.Config;

Config config = new Config();
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// Ganti dengan ID wilayah tempat kluster Anda berada
config.setRegionId("cn-hangzhou");
// Ganti dengan titik akhir untuk wilayah tersebut
config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");

Client client = new Client(config);

Operasi SDK

Bagian-bagian berikut menunjukkan setiap operasi sebagai contoh mandiri. Semua operasi mengikuti pola yang sama: membuat objek permintaan, memanggil metode klien, dan mengekstrak hasil dari badan respons.

Kirim pekerjaan Spark

Panggil submitSparkApp() dengan ID kluster, nama kelompok sumber daya, konfigurasi pekerjaan, dan jenis pekerjaan. Metode ini mengembalikan ID pekerjaan (appId), yang digunakan dalam semua operasi selanjutnya.

appType menerima dua nilai:

NilaiDeskripsi
BatchPekerjaan Spark batch
SQLPekerjaan Spark SQL

Untuk pekerjaan Batch, berikan string JSON berisi konfigurasi pekerjaan. Untuk pekerjaan SQL, berikan pernyataan SQL.

import com.aliyun.adb20211201.models.SubmitSparkAppRequest;
import com.aliyun.adb20211201.models.SubmitSparkAppResponse;

@SneakyThrows
public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
    SubmitSparkAppRequest request = new SubmitSparkAppRequest();
    request.setDBClusterId(clusterId);
    request.setResourceGroupName(rgName);
    request.setData(data);
    request.setAppType(type);

    SubmitSparkAppResponse response = client.submitSparkApp(request);
    // Simpan appId yang dikembalikan — Anda membutuhkannya untuk semua operasi selanjutnya
    return response.getBody().getData().getAppId();
}

Contoh berikut mengirim pekerjaan SparkPi batch:

String clusterId = "amv-bp1mhnosdb38****";
String resourceGroupName = "test";

String data = "{\n" +
        "    \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
        "    \"args\": [\"1000\"],\n" +
        "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
        "    \"name\": \"SparkPi\",\n" +
        "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
        "    \"conf\": {\n" +
        "        \"spark.driver.resourceSpec\": \"medium\",\n" +
        "        \"spark.executor.instances\": 2,\n" +
        "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
        "}\n";

String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);
System.out.println("ID pekerjaan yang dikirim: " + appId);

Kueri status pekerjaan

Panggil getSparkAppState() dengan ID pekerjaan untuk mendapatkan status terkini.

import com.aliyun.adb20211201.models.GetSparkAppStateRequest;
import com.aliyun.adb20211201.models.GetSparkAppStateResponse;

@SneakyThrows
public static String getAppState(String appId, Client client) {
    GetSparkAppStateRequest request = new GetSparkAppStateRequest();
    request.setAppId(appId);

    GetSparkAppStateResponse response = client.getSparkAppState(request);
    return response.getBody().getData().getState();
}

Status terminal pekerjaan

Pekerjaan mencapai salah satu status terminal berikut saat selesai:

StatusDeskripsi
COMPLETEDPekerjaan berhasil selesai
FAILEDPekerjaan gagal
FATALPekerjaan mengalami kesalahan fatal

Untuk memantau hingga selesai, lakukan loop sampai pekerjaan mencapai status terminal (COMPLETED, FAILED, atau FATAL):

long maxRunningTimeMs = 60000;  // 60 detik
long pollIntervalMs = 2000;     // 2 detik

String state;
long startTime = System.currentTimeMillis();
do {
    state = getAppState(appId, client);
    if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
        System.out.println("Waktu tunggu habis saat menunggu pekerjaan selesai.");
        break;
    }
    System.out.println("Status saat ini: " + state);
    Thread.sleep(pollIntervalMs);
} while (!"COMPLETED".equalsIgnoreCase(state)
        && !"FATAL".equalsIgnoreCase(state)
        && !"FAILED".equalsIgnoreCase(state));

Kueri detail pekerjaan

Panggil getSparkAppInfo() untuk mengambil metadata pekerjaan seperti alamat Spark UI dan cap waktu mulai/selesai.

import com.aliyun.adb20211201.models.GetSparkAppInfoRequest;
import com.aliyun.adb20211201.models.GetSparkAppInfoResponse;
import com.aliyun.adb20211201.models.SparkAppInfo;

@SneakyThrows
public static SparkAppInfo getAppInfo(String appId, Client client) {
    GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
    request.setAppId(appId);

    GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
    return response.getBody().getData();
}

Akses bidang spesifik dari objek SparkAppInfo yang dikembalikan:

SparkAppInfo appInfo = getAppInfo(appId, client);
System.out.println("Status:         " + appInfo.getState());
System.out.println("Spark UI:       " + appInfo.getDetail().webUiAddress);
System.out.println("Dikirim pada:   " + appInfo.getDetail().submittedTimeInMillis);
System.out.println("Dihentikan pada:" + appInfo.getDetail().terminatedTimeInMillis);

Ambil log pekerjaan

Panggil getSparkAppLog() untuk mendapatkan log driver suatu pekerjaan.

import com.aliyun.adb20211201.models.GetSparkAppLogRequest;
import com.aliyun.adb20211201.models.GetSparkAppLogResponse;

@SneakyThrows
public static String getAppDriverLog(String appId, Client client) {
    GetSparkAppLogRequest request = new GetSparkAppLogRequest();
    request.setAppId(appId);

    GetSparkAppLogResponse response = client.getSparkAppLog(request);
    return response.getBody().getData().getLogContent();
}
String log = getAppDriverLog(appId, client);
System.out.println(log);

Daftar pekerjaan historis

Panggil listSparkApps() untuk mengambil daftar pekerjaan Spark historis untuk suatu kluster. Hasilnya dipaginasi — nomor halaman dimulai dari 1.

import com.aliyun.adb20211201.models.ListSparkAppsRequest;
import com.aliyun.adb20211201.models.ListSparkAppsResponse;
import java.util.List;

@SneakyThrows
public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
    ListSparkAppsRequest request = new ListSparkAppsRequest();
    request.setDBClusterId(clusterId);
    request.setPageNumber(pageNumber);
    request.setPageSize(pageSize);

    ListSparkAppsResponse response = client.listSparkApps(request);
    return response.getBody().getData().getAppInfoList();
}
// Ambil 50 pekerjaan pertama untuk kluster tersebut
List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
for (SparkAppInfo job : jobs) {
    System.out.printf("AppId: %s | Status: %s | Spark UI: %s%n",
            job.getAppId(),
            job.getState(),
            job.getDetail().webUiAddress);
}

Hentikan pekerjaan

Panggil killSparkApp() dengan ID pekerjaan untuk menghentikan pekerjaan yang sedang berjalan.

import com.aliyun.adb20211201.models.KillSparkAppRequest;

KillSparkAppRequest request = new KillSparkAppRequest();
request.setAppId(appId);
client.killSparkApp(request);
System.out.println("Pekerjaan dihentikan: " + appId);

Contoh lengkap

Contoh end-to-end berikut menggabungkan semua operasi: mengirim pekerjaan SparkPi, menunggu hingga selesai, mengambil detail dan log-nya, menampilkan daftar pekerjaan historis, serta menghentikan pekerjaan tersebut.

import com.aliyun.adb20211201.Client;
import com.aliyun.adb20211201.models.*;
import com.aliyun.teaopenapi.models.Config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.SneakyThrows;

import java.util.List;

public class SparkExample {
    private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();

    @SneakyThrows
    public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
        SubmitSparkAppRequest request = new SubmitSparkAppRequest();
        request.setDBClusterId(clusterId);
        request.setResourceGroupName(rgName);
        request.setData(data);
        request.setAppType(type);
        System.out.println("Mengirim pekerjaan: " + gson.toJson(request));
        SubmitSparkAppResponse response = client.submitSparkApp(request);
        System.out.println("Respons pengiriman: " + gson.toJson(response));
        return response.getBody().getData().getAppId();
    }

    @SneakyThrows
    public static String getAppState(String appId, Client client) {
        GetSparkAppStateRequest request = new GetSparkAppStateRequest();
        request.setAppId(appId);
        GetSparkAppStateResponse response = client.getSparkAppState(request);
        return response.getBody().getData().getState();
    }

    @SneakyThrows
    public static SparkAppInfo getAppInfo(String appId, Client client) {
        GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
        request.setAppId(appId);
        GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
        return response.getBody().getData();
    }

    @SneakyThrows
    public static String getAppDriverLog(String appId, Client client) {
        GetSparkAppLogRequest request = new GetSparkAppLogRequest();
        request.setAppId(appId);
        GetSparkAppLogResponse response = client.getSparkAppLog(request);
        return response.getBody().getData().getLogContent();
    }

    @SneakyThrows
    public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
        ListSparkAppsRequest request = new ListSparkAppsRequest();
        request.setDBClusterId(clusterId);
        request.setPageNumber(pageNumber);
        request.setPageSize(pageSize);
        ListSparkAppsResponse response = client.listSparkApps(request);
        return response.getBody().getData().getAppInfoList();
    }

    public static void main(String[] args) throws Exception {
        // Inisialisasi klien menggunakan kredensial dari variabel lingkungan
        Config config = new Config();
        config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.setRegionId("cn-hangzhou");
        config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
        Client client = new Client(config);

        String clusterId = "amv-bp1mhnosdb38****";
        String resourceGroupName = "test";
        String data = "{\n" +
                "    \"comments\": [\"-- Here is just an example of SparkPi. Modify the content and run your spark program.\"],\n" +
                "    \"args\": [\"1000\"],\n" +
                "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
                "    \"name\": \"SparkPi\",\n" +
                "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
                "    \"conf\": {\n" +
                "        \"spark.driver.resourceSpec\": \"medium\",\n" +
                "        \"spark.executor.instances\": 2,\n" +
                "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
                "}\n";

        // Langkah 1: Kirim pekerjaan
        String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);

        // Langkah 2: Pantau hingga pekerjaan mencapai status terminal
        long maxRunningTimeMs = 60000;
        long pollIntervalMs = 2000;
        String state;
        long startTime = System.currentTimeMillis();
        do {
            state = getAppState(appId, client);
            if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
                System.out.println("Waktu tunggu habis.");
                break;
            }
            System.out.println("Status saat ini: " + state);
            Thread.sleep(pollIntervalMs);
        } while (!"COMPLETED".equalsIgnoreCase(state)
                && !"FATAL".equalsIgnoreCase(state)
                && !"FAILED".equalsIgnoreCase(state));

        // Langkah 3: Ambil detail pekerjaan
        SparkAppInfo appInfo = getAppInfo(appId, client);
        System.out.printf("Status: %s | Spark UI: %s | Dikirim: %s | Dihentikan: %s%n",
                state,
                appInfo.getDetail().webUiAddress,
                appInfo.getDetail().submittedTimeInMillis,
                appInfo.getDetail().terminatedTimeInMillis);

        // Langkah 4: Ambil log driver
        String log = getAppDriverLog(appId, client);
        System.out.println(log);

        // Langkah 5: Daftar pekerjaan historis
        List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
        jobs.forEach(job -> System.out.printf("AppId: %s | Status: %s | Spark UI: %s%n",
                job.getAppId(),
                job.getState(),
                job.getDetail().webUiAddress));

        // Langkah 6: Hentikan pekerjaan
        KillSparkAppRequest killRequest = new KillSparkAppRequest();
        killRequest.setAppId(appId);
        client.killSparkApp(killRequest);
    }
}

Langkah selanjutnya