AnalyticDB for MySQL Data Lakehouse Edition (V3.0) memungkinkan Anda menggunakan SDK for Java untuk mengembangkan aplikasi Spark dan tugas Spark SQL. Artikel ini menjelaskan cara menggunakan AnalyticDB for MySQL SDK for Java untuk mengirimkan tugas Spark, memeriksa status dan log tugas Spark, menghentikan tugas Spark, serta melihat tugas Spark historis.
Prasyarat
JDK 1.8 atau versi lebih baru telah terinstal.
Kluster Data Lakehouse Edition (V3.0) AnalyticDB for MySQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat kluster Data Lakehouse Edition.
Grup sumber daya tugas telah dibuat untuk kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0). Untuk informasi lebih lanjut, lihat Buat grup sumber daya.
Jalur penyimpanan log tugas Spark telah dikonfigurasi.
CatatanAnda dapat menggunakan salah satu metode berikut untuk mengonfigurasi jalur log:
Masuk ke konsol AnalyticDB for MySQL dan buka halaman Spark JAR Development. Di pojok kanan atas halaman, klik Log Settings untuk mengonfigurasi jalur log.
Gunakan parameter
spark.app.log.rootPathuntuk menentukan jalur Object Storage Service (OSS) guna menyimpan log tugas Spark.
Prosedur
Tambahkan dependensi Maven ke file pom.xml. Contoh kode:
<dependencies> <dependency> <groupId>com.aliyun</groupId> <artifactId>adb20211201</artifactId> <version>1.0.16</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.30</version> </dependency> </dependencies>CatatanDisarankan untuk menetapkan nomor versi AnalyticDB for MySQL SDK for Java menjadi 1.0.16.
Konfigurasikan variabel lingkungan
ALIBABA_CLOUD_ACCESS_KEY_IDdanALIBABA_CLOUD_ACCESS_KEY_SECRET. Untuk informasi lebih lanjut, lihat Konfigurasikan variabel lingkungan di Linux, macOS, dan Windows.Jalankan contoh kode berikut untuk mengirimkan tugas Spark, memeriksa status dan log tugas Spark, menghentikan tugas Spark, serta melihat tugas Spark historis:
import com.aliyun.adb20211201.Client; import com.aliyun.adb20211201.models.*; import com.aliyun.teaopenapi.models.Config; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import lombok.SneakyThrows; import java.util.List; public class SparkExample { private static Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); /** Kirim tugas Spark. @param client: Klien Alibaba Cloud. * @param clusterId: ID kluster. * @param rgName: Nama grup sumber daya. * @param type: Jenis tugas Spark. Nilai valid: Batch dan SQL. * @param data: Jika Anda menetapkan jenis tugas ke Batch, masukkan data JSON tentang tugas Spark. Jika Anda menetapkan jenis tugas ke SQL, masukkan pernyataan SQL. * @return: ID tugas Spark. **/ @SneakyThrows public static String submitSparkApp(String clusterId, String rgName, String data, String type, Client client) { SubmitSparkAppRequest request = new SubmitSparkAppRequest(); request.setDBClusterId(clusterId); request.setResourceGroupName(rgName); request.setData(data); request.setAppType(type); System.out.println("Mulai membuat permintaan Pengiriman " + gson.toJson(request)); SubmitSparkAppResponse submitSparkAppResponse = client.submitSparkApp(request); System.out.println("Respons pengiriman aplikasi: " + gson.toJson(submitSparkAppResponse)); return submitSparkAppResponse.getBody().getData().getAppId(); } /** * Periksa status tugas Spark. * * @param appId: ID tugas Spark. * @param client: Klien Alibaba Cloud. * @return: Status tugas Spark. */ @SneakyThrows public static String getAppState(String appId, Client client) { GetSparkAppStateRequest request = new GetSparkAppStateRequest(); request.setAppId(appId); System.out.println("Mulai mendapatkan permintaan status aplikasi " + gson.toJson(request)); GetSparkAppStateResponse sparkAppState = client.getSparkAppState(request); System.out.println("Respons status aplikasi: " + gson.toJson(sparkAppState)); return sparkAppState.getBody().getData().getState(); } /** * Periksa detail tentang tugas Spark. * * @param appId: ID tugas Spark. * @param client: Klien Alibaba Cloud. * @return: Detail tentang tugas Spark. */ @SneakyThrows public static SparkAppInfo getAppInfo(String appId, Client client) { GetSparkAppInfoRequest request = new GetSparkAppInfoRequest(); request.setAppId(appId); System.out.println("Mulai mendapatkan permintaan info aplikasi " + gson.toJson(request)); GetSparkAppInfoResponse sparkAppInfo = client.getSparkAppInfo(request); System.out.println("Respons info aplikasi: " + gson.toJson(sparkAppInfo)); return sparkAppInfo.getBody().getData(); } /** * Periksa log tugas Spark. * * @param appId: ID tugas Spark. * @param client: Klien Alibaba Cloud. * @return: Log tugas Spark. */ @SneakyThrows public static String getAppDriverLog(String appId, Client client) { GetSparkAppLogRequest request = new GetSparkAppLogRequest(); request.setAppId(appId); System.out.println("Mulai mendapatkan permintaan log aplikasi " + gson.toJson(request)); GetSparkAppLogResponse sparkAppLog = client.getSparkAppLog(request); System.out.println("Respons log aplikasi: " + gson.toJson(sparkAppLog)); return sparkAppLog.getBody().getData().getLogContent(); } /** * Periksa tugas Spark historis. * @param dbClusterId: ID kluster. * @param pageNumber: Nomor halaman. Halaman dimulai dari halaman 1. Nilai default: 1. * @param pageSize: Jumlah entri per halaman. * @param client: Klien Alibaba Cloud. * @return: Detail tentang tugas Spark. */ @SneakyThrows public static List<SparkAppInfo> listSparkApps(String dbClusterId, long pageNumber, long pageSize, Client client) { ListSparkAppsRequest request = new ListSparkAppsRequest(); request.setDBClusterId(dbClusterId); request.setPageNumber(pageNumber); request.setPageSize(pageSize); System.out.println("Mulai mendaftar permintaan aplikasi spark " + gson.toJson(request)); ListSparkAppsResponse listSparkAppsResponse = client.listSparkApps(request); System.out.println("Respons daftar aplikasi spark: " + gson.toJson(listSparkAppsResponse)); return listSparkAppsResponse.getBody().getData().getAppInfoList(); } /** * Contoh untuk mengirimkan aplikasi spark * * @param args Access Key ID, Access Key Secret, ADB Cluster ID, ADB Resource Group Name, Data Pengiriman, Tipe Pengiriman * @throws Exception */ public static void main(String[] args) throws Exception { // Klien Alibaba Cloud. Config config = new Config(); // Dapatkan AccessKey ID dari variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID. config.setAccessKeyId(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")); // Dapatkan Rahasia AccessKey dari variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET. config.setAccessKeySecret(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")); // ID wilayah tempat kluster berada. config.setRegionId("cn-hangzhou"); // Titik akhir. cn-hangzhou menunjukkan ID wilayah tempat kluster berada. config.setEndpoint("adb.cn-hangzhou.aliyuncs.com"); Client client = new Client(config); // ID kluster. String dbClusterId = "amv-bp1mhnosdb38****"; // Nama grup sumber daya. String resourceGroupName = "test"; // Konten tugas Spark. String data = "{\n" + " \"comments\": [\"-- Ini hanya contoh SparkPi. Ubah kontennya dan jalankan program spark Anda.\"],\n" + " \"args\": [\"1000\"],\n" + " \"file\": \"local:///tmp/spark-examples.jar\",\n" + " \"name\": \"SparkPi\",\n" + " \"className\": \"org.apache.spark.examples.SparkPi\",\n" + " \"conf\": {\n" + " \"spark.driver.resourceSpec\": \"medium\",\n" + " \"spark.executor.instances\": 2,\n" + " \"spark.executor.resourceSpec\": \"medium\"}\n" + "}\n"; // Jenis tugas Spark. String type = "Batch"; // Waktu maksimum tugas Spark berjalan. long sparkAppMaxRunningTimeInMilli = 60000; // Interval pemindaian. long getAppStateinterval = 2000; // Kirim tugas Spark. String appId = submitSparkApp(dbClusterId, resourceGroupName, data, type, client); // Periksa status tugas Spark. String state; long startTimeInMillis = System.currentTimeMillis(); do { state = getAppState(appId, client); if (System.currentTimeMillis() - startTimeInMillis > sparkAppMaxRunningTimeInMilli) { System.out.println("Waktu habis"); break; } else { System.out.println("Status saat ini: " + state); Thread.sleep(getAppStateinterval); } } while (!"COMPLETED".equalsIgnoreCase(state) && !"FATAL".equalsIgnoreCase(state) && !"FAILED".equalsIgnoreCase(state)); // Periksa detail tentang tugas Spark. SparkAppInfo appInfo = getAppInfo(appId, client); String x = String.format("Status: %s\n WebUI: %s\n waktu pengiriman: %s\n, waktu terminasi: %s\n", state, appInfo.getDetail().webUiAddress, appInfo.getDetail().submittedTimeInMillis, appInfo.getDetail().terminatedTimeInMillis); System.out.println(x); // Periksa log tugas Spark. String log = getAppDriverLog(appId, client); System.out.println(log); // Periksa tugas Spark historis. List<SparkAppInfo> sparkAppInfos = listSparkApps(dbClusterId, 1, 50, client); sparkAppInfos.forEach(sparkAppInfo -> { String y = String.format("AppId: %s\n Status: %s\n WebUI: %s\n waktu pengiriman: %s\n, waktu terminasi: %s\n", appInfo.getAppId(), appInfo.getState(), appInfo.getDetail().webUiAddress, appInfo.getDetail().submittedTimeInMillis, appInfo.getDetail().terminatedTimeInMillis); System.out.println(y); }); // Hentikan tugas Spark. KillSparkAppRequest request = new KillSparkAppRequest(); request.setAppId(appId); client.killSparkApp(request); } }