AnalyticDB for MySQL Data Lakehouse Edition (V3.0) では、Java SDK を使用して Spark ジョブをプログラムで管理できます。本ガイドでは、Spark ジョブの送信、ステータスのポーリング、ログおよび詳細情報の取得、過去のジョブ一覧の表示、実行中のジョブの終了について説明します。
前提条件
開始する前に、以下の条件を満たしていることを確認してください。
JDK 1.8 以降がインストール済みであること
AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスター。詳細については、「Data Lakehouse Edition クラスターを作成する」をご参照ください。
クラスター用のジョブ用リソースグループです。詳細については、「リソースグループを作成する」をご参照ください。
ログ保存パスが設定済みであること。設定方法は以下のいずれかを選択してください。
AnalyticDB for MySQL コンソールで、Spark JAR 開発 ページに移動し、右上隅の ログ設定 をクリックします。
spark.app.log.rootPathパラメーターを Object Storage Service (OSS) のパスに設定します。
Maven 依存関係の追加
次の依存関係を pom.xml ファイルに追加します。adb20211201 のバージョン 1.0.16 を使用することで、安定性を確保できます。
<dependencies>
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>adb20211201</artifactId>
<version>1.0.16</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.30</version>
</dependency>
</dependencies>認証の構成
AccessKey の認証情報を環境変数に格納します。ソースコード内に認証情報をハードコーディングすると、機密情報が漏洩するリスクがあります。
export ALIBABA_CLOUD_ACCESS_KEY_ID=<your-access-key-id>
export ALIBABA_CLOUD_ACCESS_KEY_SECRET=<your-access-key-secret>Linux、macOS、Windows における環境変数の設定手順については、「環境変数の構成」をご参照ください。
環境変数から認証情報を読み取ってクライアントを初期化します。
import com.aliyun.adb20211201.Client;
import com.aliyun.teaopenapi.models.Config;
Config config = new Config();
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
// クラスターが配置されているリージョン ID に置き換えます
config.setRegionId("cn-hangzhou");
// 当該リージョンのエンドポイントに置き換えます
config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
Client client = new Client(config);SDK 操作
以下の各セクションでは、個別の操作を独立した例として示します。すべての操作は、以下の共通パターンに従います:リクエストオブジェクトの構築 → クライアントメソッドの呼び出し → 応答ボディからの結果抽出。
Spark ジョブの送信
クラスター ID、リソースグループ名、ジョブ構成、ジョブタイプを指定して submitSparkApp() を呼び出します。このメソッドはジョブ ID (appId) を返すため、以降のすべての操作で使用します。
appType には以下の 2 つの値を指定できます。
| 値 | 説明 |
|---|---|
Batch | Spark バッチジョブ |
SQL | Spark SQL ジョブ |
Batch ジョブの場合、ジョブ構成を含む JSON 文字列を渡します。SQL ジョブの場合、SQL ステートメントを渡します。
import com.aliyun.adb20211201.models.SubmitSparkAppRequest;
import com.aliyun.adb20211201.models.SubmitSparkAppResponse;
@SneakyThrows
public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
SubmitSparkAppRequest request = new SubmitSparkAppRequest();
request.setDBClusterId(clusterId);
request.setResourceGroupName(rgName);
request.setData(data);
request.setAppType(type);
SubmitSparkAppResponse response = client.submitSparkApp(request);
// 返された appId を保存します — 以降のすべての操作で必要です
return response.getBody().getData().getAppId();
}以下の例では、SparkPi バッチジョブを送信します。
String clusterId = "amv-bp1mhnosdb38****";
String resourceGroupName = "test";
String data = "{\n" +
" \"comments\": [\"-- これは単なる SparkPi の例です。内容を変更して、ご自身の Spark プログラムを実行してください。\"],\n" +
" \"args\": [\"1000\"],\n" +
" \"file\": \"local:///tmp/spark-examples.jar\",\n" +
" \"name\": \"SparkPi\",\n" +
" \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
" \"conf\": {\n" +
" \"spark.driver.resourceSpec\": \"medium\",\n" +
" \"spark.executor.instances\": 2,\n" +
" \"spark.executor.resourceSpec\": \"medium\"}\n" +
"}\n";
String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);
System.out.println("送信済みジョブ ID: " + appId);ジョブステータスの照会
ジョブ ID を指定して getSparkAppState() を呼び出して、現在のステータスを取得します。
import com.aliyun.adb20211201.models.GetSparkAppStateRequest;
import com.aliyun.adb20211201.models.GetSparkAppStateResponse;
@SneakyThrows
public static String getAppState(String appId, Client client) {
GetSparkAppStateRequest request = new GetSparkAppStateRequest();
request.setAppId(appId);
GetSparkAppStateResponse response = client.getSparkAppState(request);
return response.getBody().getData().getState();
}ジョブの終了状態
ジョブが完了すると、以下のいずれかの終了状態に到達します。
| 状態 | 説明 |
|---|---|
COMPLETED | ジョブが正常に完了しました |
FAILED | ジョブが失敗しました |
FATAL | ジョブで致命的なエラーが発生しました |
完了を待機するには、ジョブが終了状態 (COMPLETED、FAILED、または FATAL) に到達するまでループします。
long maxRunningTimeMs = 60000; // 60 秒
long pollIntervalMs = 2000; // 2 秒
String state;
long startTime = System.currentTimeMillis();
do {
state = getAppState(appId, client);
if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
System.out.println("ジョブ完了待ちがタイムアウトしました。");
break;
}
System.out.println("現在の状態: " + state);
Thread.sleep(pollIntervalMs);
} while (!"COMPLETED".equalsIgnoreCase(state)
&& !"FATAL".equalsIgnoreCase(state)
&& !"FAILED".equalsIgnoreCase(state));ジョブ詳細の照会
getSparkAppInfo() を呼び出して、Spark UI のアドレスや開始/終了タイムスタンプなどのジョブメタデータを取得します。
import com.aliyun.adb20211201.models.GetSparkAppInfoRequest;
import com.aliyun.adb20211201.models.GetSparkAppInfoResponse;
import com.aliyun.adb20211201.models.SparkAppInfo;
@SneakyThrows
public static SparkAppInfo getAppInfo(String appId, Client client) {
GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
request.setAppId(appId);
GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
return response.getBody().getData();
}返された SparkAppInfo オブジェクトから特定のフィールドにアクセスします。
SparkAppInfo appInfo = getAppInfo(appId, client);
System.out.println("状態: " + appInfo.getState());
System.out.println("Spark UI: " + appInfo.getDetail().webUiAddress);
System.out.println("送信時刻: " + appInfo.getDetail().submittedTimeInMillis);
System.out.println("終了時刻: " + appInfo.getDetail().terminatedTimeInMillis);ジョブログの取得
getSparkAppLog() を呼び出して、ジョブのドライバー ログを取得します。
import com.aliyun.adb20211201.models.GetSparkAppLogRequest;
import com.aliyun.adb20211201.models.GetSparkAppLogResponse;
@SneakyThrows
public static String getAppDriverLog(String appId, Client client) {
GetSparkAppLogRequest request = new GetSparkAppLogRequest();
request.setAppId(appId);
GetSparkAppLogResponse response = client.getSparkAppLog(request);
return response.getBody().getData().getLogContent();
}String log = getAppDriverLog(appId, client);
System.out.println(log);過去のジョブ一覧の表示
listSparkApps() を呼び出して、クラスターに関連付けられた過去の Spark ジョブを取得します。結果はページネーションされます — ページ番号は 1 から始まります。
import com.aliyun.adb20211201.models.ListSparkAppsRequest;
import com.aliyun.adb20211201.models.ListSparkAppsResponse;
import java.util.List;
@SneakyThrows
public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
ListSparkAppsRequest request = new ListSparkAppsRequest();
request.setDBClusterId(clusterId);
request.setPageNumber(pageNumber);
request.setPageSize(pageSize);
ListSparkAppsResponse response = client.listSparkApps(request);
return response.getBody().getData().getAppInfoList();
}// クラスターの最初の 50 件のジョブを取得します
List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
for (SparkAppInfo job : jobs) {
System.out.printf("AppId: %s | 状態: %s | Spark UI: %s%n",
job.getAppId(),
job.getState(),
job.getDetail().webUiAddress);
}ジョブの終了
ジョブ ID を指定して killSparkApp() を呼び出して、実行中のジョブを停止します。
import com.aliyun.adb20211201.models.KillSparkAppRequest;
KillSparkAppRequest request = new KillSparkAppRequest();
request.setAppId(appId);
client.killSparkApp(request);
System.out.println("ジョブを終了しました: " + appId);完全な例
以下のエンドツーエンドの例では、すべての操作を統合しています:SparkPi ジョブの送信、完了までの待機、詳細およびログの取得、過去のジョブ一覧の表示、およびジョブの終了です。
import com.aliyun.adb20211201.Client;
import com.aliyun.adb20211201.models.*;
import com.aliyun.teaopenapi.models.Config;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import lombok.SneakyThrows;
import java.util.List;
public class SparkExample {
private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
@SneakyThrows
public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) {
SubmitSparkAppRequest request = new SubmitSparkAppRequest();
request.setDBClusterId(clusterId);
request.setResourceGroupName(rgName);
request.setData(data);
request.setAppType(type);
System.out.println("ジョブを送信中: " + gson.toJson(request));
SubmitSparkAppResponse response = client.submitSparkApp(request);
System.out.println("送信応答: " + gson.toJson(response));
return response.getBody().getData().getAppId();
}
@SneakyThrows
public static String getAppState(String appId, Client client) {
GetSparkAppStateRequest request = new GetSparkAppStateRequest();
request.setAppId(appId);
GetSparkAppStateResponse response = client.getSparkAppState(request);
return response.getBody().getData().getState();
}
@SneakyThrows
public static SparkAppInfo getAppInfo(String appId, Client client) {
GetSparkAppInfoRequest request = new GetSparkAppInfoRequest();
request.setAppId(appId);
GetSparkAppInfoResponse response = client.getSparkAppInfo(request);
return response.getBody().getData();
}
@SneakyThrows
public static String getAppDriverLog(String appId, Client client) {
GetSparkAppLogRequest request = new GetSparkAppLogRequest();
request.setAppId(appId);
GetSparkAppLogResponse response = client.getSparkAppLog(request);
return response.getBody().getData().getLogContent();
}
@SneakyThrows
public static List<SparkAppInfo> listSparkApps(String clusterId, long pageNumber, long pageSize, Client client) {
ListSparkAppsRequest request = new ListSparkAppsRequest();
request.setDBClusterId(clusterId);
request.setPageNumber(pageNumber);
request.setPageSize(pageSize);
ListSparkAppsResponse response = client.listSparkApps(request);
return response.getBody().getData().getAppInfoList();
}
public static void main(String[] args) throws Exception {
// 環境変数から認証情報を使用してクライアントを初期化します
Config config = new Config();
config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"));
config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"));
config.setRegionId("cn-hangzhou");
config.setEndpoint("adb.cn-hangzhou.aliyuncs.com");
Client client = new Client(config);
String clusterId = "amv-bp1mhnosdb38****";
String resourceGroupName = "test";
String data = "{\n" +
" \"comments\": [\"-- これは単なる SparkPi の例です。内容を変更して、ご自身の Spark プログラムを実行してください。\"],\n" +
" \"args\": [\"1000\"],\n" +
" \"file\": \"local:///tmp/spark-examples.jar\",\n" +
" \"name\": \"SparkPi\",\n" +
" \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
" \"conf\": {\n" +
" \"spark.driver.resourceSpec\": \"medium\",\n" +
" \"spark.executor.instances\": 2,\n" +
" \"spark.executor.resourceSpec\": \"medium\"}\n" +
"}\n";
// 手順 1:ジョブの送信
String appId = submitSparkApp(clusterId, resourceGroupName, data, "Batch", client);
// 手順 2:ジョブが終了状態に到達するまでポーリング
long maxRunningTimeMs = 60000;
long pollIntervalMs = 2000;
String state;
long startTime = System.currentTimeMillis();
do {
state = getAppState(appId, client);
if (System.currentTimeMillis() - startTime > maxRunningTimeMs) {
System.out.println("タイムアウトしました。");
break;
}
System.out.println("現在の状態: " + state);
Thread.sleep(pollIntervalMs);
} while (!"COMPLETED".equalsIgnoreCase(state)
&& !"FATAL".equalsIgnoreCase(state)
&& !"FAILED".equalsIgnoreCase(state));
// 手順 3:ジョブ詳細の取得
SparkAppInfo appInfo = getAppInfo(appId, client);
System.out.printf("状態: %s | Spark UI: %s | 送信時刻: %s | 終了時刻: %s%n",
state,
appInfo.getDetail().webUiAddress,
appInfo.getDetail().submittedTimeInMillis,
appInfo.getDetail().terminatedTimeInMillis);
// 手順 4:ドライバーログの取得
String log = getAppDriverLog(appId, client);
System.out.println(log);
// 手順 5:過去のジョブ一覧の表示
List<SparkAppInfo> jobs = listSparkApps(clusterId, 1, 50, client);
jobs.forEach(job -> System.out.printf("AppId: %s | 状態: %s | Spark UI: %s%n",
job.getAppId(),
job.getState(),
job.getDetail().webUiAddress));
// 手順 6:ジョブの終了
KillSparkAppRequest killRequest = new KillSparkAppRequest();
killRequest.setAppId(appId);
client.killSparkApp(killRequest);
}
}