全部产品
Search
文档中心

AnalyticDB:Kembangkan aplikasi Spark dengan AnalyticDB for MySQL SDK untuk Python

更新时间:Mar 01, 2026

AnalyticDB for MySQL SDK untuk Python memungkinkan Anda mengirim dan mengelola pekerjaan Spark secara terprogram tanpa menggunakan Konsol. Dengan SDK ini, Anda dapat mengirim pekerjaan Spark SQL dan JAR, memantau status pekerjaan, mengambil log, menghentikan tugas yang sedang berjalan, serta menampilkan riwayat pekerjaan.

Memulai dengan cepat

Contoh berikut mengirim pekerjaan Spark SQL, memeriksa statusnya, dan mengambil log-nya.

import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
    SubmitSparkAppRequest,
    GetSparkAppStateRequest,
    GetSparkAppLogRequest,
)

# Inisialisasi klien.
config = Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    endpoint="adb.cn-hangzhou.aliyuncs.com"
)
client = Client(config)

# Kirim pekerjaan Spark SQL.
sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    show databases;
"""

request = SubmitSparkAppRequest(
    dbcluster_id="amv-bp1wo70f0k3c****",
    resource_group_name="test",
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("App ID:", app_id)

# Tanyakan status pekerjaan.
state_response = client.get_spark_app_state(GetSparkAppStateRequest(app_id=app_id))
print("State:", state_response.body.data.state)

# Ambil log pekerjaan.
log_response = client.get_spark_app_log(GetSparkAppLogRequest(app_id=app_id))
print("Logs:", log_response.body.data.log_content)

Ganti amv-bp1wo70f0k3c**** dengan ID kluster Anda dan cn-hangzhou dengan wilayah tempat kluster Anda berada.

Prasyarat

Sebelum memulai, pastikan Anda telah memiliki:

Inisialisasi klien

Buat instans Client untuk melakukan autentikasi dengan Alibaba Cloud. Semua operasi selanjutnya menggunakan klien ini.

import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

config = Config(
    # Dapatkan ID AccessKey dari variabel lingkungan.
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    # Dapatkan Rahasia AccessKey dari variabel lingkungan.
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    # Atur titik akhir. Ganti ID wilayah dengan milik Anda sendiri.
    endpoint="adb.<region-id>.aliyuncs.com"
)

client = Client(config)

Ganti <region-id> dengan ID wilayah tempat kluster Anda berada, misalnya cn-hangzhou.

Kirim pekerjaan Spark SQL

Atur app_type ke "SQL" dan masukkan pernyataan SQL dalam parameter data.

from alibabacloud_adb20211201.models import SubmitSparkAppRequest

sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    -- Tambahkan pernyataan SQL Anda di bawah ini.
    show databases;
"""

request = SubmitSparkAppRequest(
    dbcluster_id="<cluster-id>",
    resource_group_name="<resource-group-name>",
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)

response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Pekerjaan Spark SQL dikirim. App ID:", app_id)

Kirim pekerjaan Spark JAR

Atur app_type ke "BATCH" dan masukkan konfigurasi pekerjaan sebagai string JSON dalam parameter data.

from alibabacloud_adb20211201.models import SubmitSparkAppRequest
import json

conf = {
    "comments": [
        "-- Ubah contoh ini untuk menjalankan program Spark Anda sendiri."
    ],
    "args": ["1000"],
    "file": "local:///tmp/spark-examples.jar",
    "name": "SparkPi",
    "className": "org.apache.spark.examples.SparkPi",
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "medium"
    }
}

request = SubmitSparkAppRequest(
    dbcluster_id="<cluster-id>",
    resource_group_name="<resource-group-name>",
    data=json.dumps(conf),
    app_type="BATCH",
    agent_source="Python SDK",
    agent_version="1.0.0"
)

response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Pekerjaan Spark JAR dikirim. App ID:", app_id)

Tanyakan status pekerjaan

Masukkan app_id yang dikembalikan saat Anda mengirim pekerjaan.

from alibabacloud_adb20211201.models import GetSparkAppStateRequest

request = GetSparkAppStateRequest(app_id="<app-id>")
response = client.get_spark_app_state(request)
state = response.body.data.state
print("Status pekerjaan:", state)

Kueri log pekerjaan

from alibabacloud_adb20211201.models import GetSparkAppLogRequest

request = GetSparkAppLogRequest(app_id="<app-id>")
response = client.get_spark_app_log(request)
log_content = response.body.data.log_content
print("Log pekerjaan:", log_content)

Hentikan pekerjaan

from alibabacloud_adb20211201.models import KillSparkAppRequest

request = KillSparkAppRequest(app_id="<app-id>")
response = client.kill_spark_app(request)
state = response.body.data.state
print("Status pekerjaan setelah dihentikan:", state)

Tampilkan riwayat pekerjaan

from alibabacloud_adb20211201.models import ListSparkAppsRequest

request = ListSparkAppsRequest(
    dbcluster_id="<cluster-id>",
    page_number=1,
    page_size=10
)

response = client.list_spark_apps(request)
print("Total pekerjaan:", response.body.data.page_number)

for app_info in response.body.data.app_info_list:
    print("App ID:", app_info.app_id)
    print("State:", app_info.state)
    print("Detail:", app_info.detail)

Contoh lengkap

Skrip berikut menggabungkan semua operasi menjadi satu program yang dapat dijalankan.

import os
import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
    SubmitSparkAppRequest,
    GetSparkAppStateRequest,
    GetSparkAppLogRequest,
    KillSparkAppRequest,
    ListSparkAppsRequest,
)

# Inisialisasi klien.
config = Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)

cluster_id = "<cluster-id>"
rg_name = "<resource-group-name>"

# Kirim pekerjaan Spark SQL.
sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    show databases;
"""

sql_request = SubmitSparkAppRequest(
    dbcluster_id=cluster_id,
    resource_group_name=rg_name,
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
sql_response = client.submit_spark_app(sql_request)
sql_app_id = sql_response.body.data.app_id
print("App ID pekerjaan SQL:", sql_app_id)

# Kirim pekerjaan Spark JAR.
jar_conf = {
    "comments": [
        "-- Ubah contoh ini untuk menjalankan program Spark Anda sendiri."
    ],
    "args": ["1000"],
    "file": "local:///tmp/spark-examples.jar",
    "name": "SparkPi",
    "className": "org.apache.spark.examples.SparkPi",
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "medium"
    }
}

jar_request = SubmitSparkAppRequest(
    dbcluster_id=cluster_id,
    resource_group_name=rg_name,
    data=json.dumps(jar_conf),
    app_type="BATCH",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
jar_response = client.submit_spark_app(jar_request)
jar_app_id = jar_response.body.data.app_id
print("App ID pekerjaan JAR:", jar_app_id)

# Tanyakan status pekerjaan.
state_request = GetSparkAppStateRequest(app_id=sql_app_id)
state_response = client.get_spark_app_state(state_request)
print("Status pekerjaan SQL:", state_response.body.data.state)

# Tanyakan log pekerjaan.
log_request = GetSparkAppLogRequest(app_id=sql_app_id)
log_response = client.get_spark_app_log(log_request)
print("Log pekerjaan SQL:", log_response.body.data.log_content)

# Tampilkan riwayat pekerjaan.
list_request = ListSparkAppsRequest(
    dbcluster_id=cluster_id,
    page_number=1,
    page_size=10
)
list_response = client.list_spark_apps(list_request)
print("Total pekerjaan:", list_response.body.data.page_number)
for app_info in list_response.body.data.app_info_list:
    print(app_info.app_id, app_info.state, app_info.detail)

# Hentikan pekerjaan JAR.
kill_request = KillSparkAppRequest(app_id=jar_app_id)
kill_response = client.kill_spark_app(kill_request)
print("Status pekerjaan JAR setelah dihentikan:", kill_response.body.data.state)

Referensi API

Parameter SubmitSparkAppRequest

ParameterTipeDeskripsi
dbcluster_idStringID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
resource_group_nameStringNama kelompok sumber daya pekerjaan.
dataStringPernyataan SQL (untuk pekerjaan SQL) atau string konfigurasi JSON (untuk pekerjaan JAR).
app_typeStringTipe pekerjaan. Nilai yang valid: "SQL" (pekerjaan Spark SQL), "BATCH" (pekerjaan Spark JAR).
agent_sourceStringSumber pengiriman. Atur ke "Python SDK".
agent_versionStringVersi SDK. Atur ke "1.0.0".

Parameter ListSparkAppsRequest

ParameterTipeDeskripsi
dbcluster_idStringID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
page_numberIntegerNomor halaman. Halaman dimulai dari 1. Nilai default: 1.
page_sizeIntegerJumlah entri per halaman.

Kelas dan metode SDK

OperasiKelas permintaanMetode klienBidang respons
Kirim pekerjaanSubmitSparkAppRequestclient.submit_spark_app()response.body.data.app_id
Tanyakan status pekerjaanGetSparkAppStateRequestclient.get_spark_app_state()response.body.data.state
Tanyakan log pekerjaanGetSparkAppLogRequestclient.get_spark_app_log()response.body.data.log_content
Hentikan pekerjaanKillSparkAppRequestclient.kill_spark_app()response.body.data.state
Tampilkan riwayat pekerjaanListSparkAppsRequestclient.list_spark_apps()response.body.data.app_info_list