すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:Java SDK を使用した Spark アプリケーションの開発

最終更新日:Mar 29, 2026

AnalyticDB for MySQL Data Lakehouse Edition (V3.0) では、Java SDK を使用して Spark ジョブをプログラムで管理できます。本ガイドでは、Spark ジョブの送信、ステータスのポーリング、ログおよび詳細情報の取得、過去のジョブ一覧の表示、実行中のジョブの終了について説明します。

前提条件

開始する前に、以下の条件を満たしていることを確認してください。

  • JDK 1.8 以降がインストール済みであること

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスター。詳細については、「Data Lakehouse Edition クラスターを作成する」をご参照ください。

  • クラスター用のジョブ用リソースグループです。詳細については、「リソースグループを作成する」をご参照ください。

  • ログ保存パスが設定済みであること。設定方法は以下のいずれかを選択してください。

    • AnalyticDB for MySQL コンソールで、Spark JAR 開発 ページに移動し、右上隅の ログ設定 をクリックします。

    • spark.app.log.rootPath パラメーターを Object Storage Service (OSS) のパスに設定します。

Maven 依存関係の追加

次の依存関係を pom.xml ファイルに追加します。adb20211201 のバージョン 1.0.16 を使用することで、安定性を確保できます。

<dependencies>
    <dependency>
        <groupId>com.aliyun</groupId>
        <artifactId>adb20211201</artifactId>
        <version>1.0.16</version>
    </dependency>
    <dependency>
        <groupId>com.google.code.gson</groupId>
        <artifactId>gson</artifactId>
        <version>2.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.30</version>
    </dependency>
</dependencies>

認証の構成

AccessKey の認証情報を環境変数に格納します。ソースコード内に認証情報をハードコーディングすると、機密情報が漏洩するリスクがあります。

export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>

Linux、macOS、Windows における環境変数の設定手順については、「環境変数の構成」をご参照ください。

環境変数から認証情報を読み取ってクライアントを初期化します。

import com.aliyun.adb20211201.Client;
import com.aliyun.teaopenapi.models.Config;

Config config = new Config();
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// クラスターが配置されているリージョン ID に置き換えます
config.setRegionId("cn-hangzhou");
// 当該リージョンのエンドポイントに置き換えます
config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");

Client client = new Client(config);

SDK 操作

以下の各セクションでは、個別の操作を独立した例として示します。すべての操作は、以下の共通パターンに従います:リクエストオブジェクトの構築 → クライアントメソッドの呼び出し → 応答ボディからの結果抽出。

Spark ジョブの送信

クラスター ID、リソースグループ名、ジョブ構成、ジョブタイプを指定して submitSparkApp() を呼び出します。このメソッドはジョブ ID (appId) を返すため、以降のすべての操作で使用します。

appType には以下の 2 つの値を指定できます。

説明
BatchSpark バッチジョブ
SQLSpark SQL ジョブ

Batch ジョブの場合、ジョブ構成を含む JSON 文字列を渡します。SQL ジョブの場合、SQL ステートメントを渡します。

import com.aliyun.adb20211201.models.SubmitSparkAppRequest;
import com.aliyun.adb20211201.models.SubmitSparkAppResponse;

@SneakyThrows
public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
    SubmitSparkAppRequest request = new SubmitSparkAppRequest();
    request.setDBClusterId(clusterId);
    request.setResourceGroupName(rgName);
    request.setData(data);
    request.setAppType(type);

    SubmitSparkAppResponse response = client.submitSparkApp(request);
    // 返された appId を保存します — 以降のすべての操作で必要です
    return response.getBody().getData().getAppId();
}

以下の例では、SparkPi バッチジョブを送信します。

String clusterId = "amv-bp1mhnosdb38****";
String resourceGroupName = "test";

String data = "{\n" +
        "    \"comments\": [\"-- これは単なる SparkPi の例です。内容を変更して、ご自身の Spark プログラムを実行してください。\"],\n" +
        "    \"args\": [\"1000\"],\n" +
        "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
        "    \"name\": \"SparkPi\",\n" +
        "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
        "    \"conf\": {\n" +
        "        \"spark.driver.resourceSpec\": \"medium\",\n" +
        "        \"spark.executor.instances\": 2,\n" +
        "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
        "}\n";

String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);
System.out.println("送信済みジョブ ID: " + appId);

ジョブステータスの照会

ジョブ ID を指定して getSparkAppState() を呼び出して、現在のステータスを取得します。

import com.aliyun.adb20211201.models.GetSparkAppStateRequest;
import com.aliyun.adb20211201.models.GetSparkAppStateResponse;

@SneakyThrows
public static String getAppState(String appId, Client client) {
    GetSparkAppStateRequest request = new GetSparkAppStateRequest();
    request.setAppId(appId);

    GetSparkAppStateResponse response = client.getSparkAppState(request);
    return response.getBody().getData().getState();
}

ジョブの終了状態

ジョブが完了すると、以下のいずれかの終了状態に到達します。

状態説明
COMPLETEDジョブが正常に完了しました
FAILEDジョブが失敗しました
FATALジョブで致命的なエラーが発生しました

完了を待機するには、ジョブが終了状態 (COMPLETEDFAILED、または FATAL) に到達するまでループします。

long maxRunningTimeMs = 60000;  // 60 秒
long pollIntervalMs = 2000;     // 2 秒

String state;
long startTime = System.currentTimeMillis();
do {
    state = getAppState(appId, client);
    if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
        System.out.println("ジョブ完了待ちがタイムアウトしました。");
        break;
    }
    System.out.println("現在の状態: " + state);
    Thread.sleep(pollIntervalMs);
} while (!"COMPLETED".equalsIgnoreCase(state)
        && !"FATAL".equalsIgnoreCase(state)
        && !"FAILED".equalsIgnoreCase(state));

ジョブ詳細の照会

getSparkAppInfo() を呼び出して、Spark UI のアドレスや開始/終了タイムスタンプなどのジョブメタデータを取得します。

import com.aliyun.adb20211201.models.GetSparkAppInfoRequest;
import com.aliyun.adb20211201.models.GetSparkAppInfoResponse;
import com.aliyun.adb20211201.models.SparkAppInfo;

@SneakyThrows
public static SparkAppInfo getAppInfo(String appId, Client client) {
    GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
    request.setAppId(appId);

    GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
    return response.getBody().getData();
}

返された SparkAppInfo オブジェクトから特定のフィールドにアクセスします。

SparkAppInfo appInfo = getAppInfo(appId, client);
System.out.println("状態:          " + appInfo.getState());
System.out.println("Spark UI:       " + appInfo.getDetail().webUiAddress);
System.out.println("送信時刻:       " + appInfo.getDetail().submittedTimeInMillis);
System.out.println("終了時刻:       " + appInfo.getDetail().terminatedTimeInMillis);

ジョブログの取得

getSparkAppLog() を呼び出して、ジョブのドライバー ログを取得します。

import com.aliyun.adb20211201.models.GetSparkAppLogRequest;
import com.aliyun.adb20211201.models.GetSparkAppLogResponse;

@SneakyThrows
public static String getAppDriverLog(String appId, Client client) {
    GetSparkAppLogRequest request = new GetSparkAppLogRequest();
    request.setAppId(appId);

    GetSparkAppLogResponse response = client.getSparkAppLog(request);
    return response.getBody().getData().getLogContent();
}
String log = getAppDriverLog(appId, client);
System.out.println(log);

過去のジョブ一覧の表示

listSparkApps() を呼び出して、クラスターに関連付けられた過去の Spark ジョブを取得します。結果はページネーションされます — ページ番号は 1 から始まります。

import com.aliyun.adb20211201.models.ListSparkAppsRequest;
import com.aliyun.adb20211201.models.ListSparkAppsResponse;
import java.util.List;

@SneakyThrows
public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
    ListSparkAppsRequest request = new ListSparkAppsRequest();
    request.setDBClusterId(clusterId);
    request.setPageNumber(pageNumber);
    request.setPageSize(pageSize);

    ListSparkAppsResponse response = client.listSparkApps(request);
    return response.getBody().getData().getAppInfoList();
}
// クラスターの最初の 50 件のジョブを取得します
List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
for (SparkAppInfo job : jobs) {
    System.out.printf("AppId: %s | 状態: %s | Spark UI: %s%n",
            job.getAppId(),
            job.getState(),
            job.getDetail().webUiAddress);
}

ジョブの終了

ジョブ ID を指定して killSparkApp() を呼び出して、実行中のジョブを停止します。

import com.aliyun.adb20211201.models.KillSparkAppRequest;

KillSparkAppRequest request = new KillSparkAppRequest();
request.setAppId(appId);
client.killSparkApp(request);
System.out.println("ジョブを終了しました: " + appId);

完全な例

以下のエンドツーエンドの例では、すべての操作を統合しています:SparkPi ジョブの送信、完了までの待機、詳細およびログの取得、過去のジョブ一覧の表示、およびジョブの終了です。

import com.aliyun.adb20211201.Client;
import com.aliyun.adb20211201.models.*;
import com.aliyun.teaopenapi.models.Config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.SneakyThrows;

import java.util.List;

public class SparkExample {
    private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();

    @SneakyThrows
    public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
        SubmitSparkAppRequest request = new SubmitSparkAppRequest();
        request.setDBClusterId(clusterId);
        request.setResourceGroupName(rgName);
        request.setData(data);
        request.setAppType(type);
        System.out.println("ジョブを送信中: " + gson.toJson(request));
        SubmitSparkAppResponse response = client.submitSparkApp(request);
        System.out.println("送信応答: " + gson.toJson(response));
        return response.getBody().getData().getAppId();
    }

    @SneakyThrows
    public static String getAppState(String appId, Client client) {
        GetSparkAppStateRequest request = new GetSparkAppStateRequest();
        request.setAppId(appId);
        GetSparkAppStateResponse response = client.getSparkAppState(request);
        return response.getBody().getData().getState();
    }

    @SneakyThrows
    public static SparkAppInfo getAppInfo(String appId, Client client) {
        GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
        request.setAppId(appId);
        GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
        return response.getBody().getData();
    }

    @SneakyThrows
    public static String getAppDriverLog(String appId, Client client) {
        GetSparkAppLogRequest request = new GetSparkAppLogRequest();
        request.setAppId(appId);
        GetSparkAppLogResponse response = client.getSparkAppLog(request);
        return response.getBody().getData().getLogContent();
    }

    @SneakyThrows
    public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
        ListSparkAppsRequest request = new ListSparkAppsRequest();
        request.setDBClusterId(clusterId);
        request.setPageNumber(pageNumber);
        request.setPageSize(pageSize);
        ListSparkAppsResponse response = client.listSparkApps(request);
        return response.getBody().getData().getAppInfoList();
    }

    public static void main(String[] args) throws Exception {
        // 環境変数から認証情報を使用してクライアントを初期化します
        Config config = new Config();
        config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
        config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
        config.setRegionId("cn-hangzhou");
        config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
        Client client = new Client(config);

        String clusterId = "amv-bp1mhnosdb38****";
        String resourceGroupName = "test";
        String data = "{\n" +
                "    \"comments\": [\"-- これは単なる SparkPi の例です。内容を変更して、ご自身の Spark プログラムを実行してください。\"],\n" +
                "    \"args\": [\"1000\"],\n" +
                "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
                "    \"name\": \"SparkPi\",\n" +
                "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
                "    \"conf\": {\n" +
                "        \"spark.driver.resourceSpec\": \"medium\",\n" +
                "        \"spark.executor.instances\": 2,\n" +
                "        \"spark.executor.resourceSpec\": \"medium\"}\n" +
                "}\n";

        // 手順 1:ジョブの送信
        String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);

        // 手順 2:ジョブが終了状態に到達するまでポーリング
        long maxRunningTimeMs = 60000;
        long pollIntervalMs = 2000;
        String state;
        long startTime = System.currentTimeMillis();
        do {
            state = getAppState(appId, client);
            if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
                System.out.println("タイムアウトしました。");
                break;
            }
            System.out.println("現在の状態: " + state);
            Thread.sleep(pollIntervalMs);
        } while (!"COMPLETED".equalsIgnoreCase(state)
                && !"FATAL".equalsIgnoreCase(state)
                && !"FAILED".equalsIgnoreCase(state));

        // 手順 3:ジョブ詳細の取得
        SparkAppInfo appInfo = getAppInfo(appId, client);
        System.out.printf("状態: %s | Spark UI: %s | 送信時刻: %s | 終了時刻: %s%n",
                state,
                appInfo.getDetail().webUiAddress,
                appInfo.getDetail().submittedTimeInMillis,
                appInfo.getDetail().terminatedTimeInMillis);

        // 手順 4:ドライバーログの取得
        String log = getAppDriverLog(appId, client);
        System.out.println(log);

        // 手順 5:過去のジョブ一覧の表示
        List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
        jobs.forEach(job -> System.out.printf("AppId: %s | 状態: %s | Spark UI: %s%n",
                job.getAppId(),
                job.getState(),
                job.getDetail().webUiAddress));

        // 手順 6:ジョブの終了
        KillSparkAppRequest killRequest = new KillSparkAppRequest();
        killRequest.setAppId(appId);
        client.killSparkApp(killRequest);
    }
}

次のステップ