AnalyticDB for MySQL SDK untuk Python memungkinkan Anda mengirim dan mengelola pekerjaan Spark secara terprogram tanpa menggunakan Konsol. Dengan SDK ini, Anda dapat mengirim pekerjaan Spark SQL dan JAR, memantau status pekerjaan, mengambil log, menghentikan tugas yang sedang berjalan, serta menampilkan riwayat pekerjaan.
Memulai dengan cepat
Contoh berikut mengirim pekerjaan Spark SQL, memeriksa statusnya, dan mengambil log-nya.
import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
SubmitSparkAppRequest,
GetSparkAppStateRequest,
GetSparkAppLogRequest,
)
# Inisialisasi klien.
config = Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
client = Client(config)
# Kirim pekerjaan Spark SQL.
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
show databases;
"""
request = SubmitSparkAppRequest(
dbcluster_id="amv-bp1wo70f0k3c****",
resource_group_name="test",
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("App ID:", app_id)
# Tanyakan status pekerjaan.
state_response = client.get_spark_app_state(GetSparkAppStateRequest(app_id=app_id))
print("State:", state_response.body.data.state)
# Ambil log pekerjaan.
log_response = client.get_spark_app_log(GetSparkAppLogRequest(app_id=app_id))
print("Logs:", log_response.body.data.log_content)Ganti amv-bp1wo70f0k3c**** dengan ID kluster Anda dan cn-hangzhou dengan wilayah tempat kluster Anda berada.
Prasyarat
Sebelum memulai, pastikan Anda telah memiliki:
Python 3.7 atau versi lebih baru
kelompok sumber daya pekerjaan untuk kluster Anda
AnalyticDB for MySQL SDK untuk Python yang telah diinstal
ALIBABA_CLOUD_ACCESS_KEY_IDdanALIBABA_CLOUD_ACCESS_KEY_SECRETVariabel lingkungan telah dikonfigurasiJalur yang dikonfigurasi untuk menyimpan log pekerjaan Spark
CatatanKonfigurasikan jalur log menggunakan salah satu metode berikut:
Di Konsol AnalyticDB for MySQL, buka halaman Spark JAR Development. Di pojok kanan atas, klik Log Settings untuk mengatur jalurnya.
Atur parameter
spark.app.log.rootPathke jalur Object Storage Service (OSS).
Inisialisasi klien
Buat instans Client untuk melakukan autentikasi dengan Alibaba Cloud. Semua operasi selanjutnya menggunakan klien ini.
import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
config = Config(
# Dapatkan ID AccessKey dari variabel lingkungan.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Dapatkan Rahasia AccessKey dari variabel lingkungan.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# Atur titik akhir. Ganti ID wilayah dengan milik Anda sendiri.
endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)Ganti <region-id> dengan ID wilayah tempat kluster Anda berada, misalnya cn-hangzhou.
Kirim pekerjaan Spark SQL
Atur app_type ke "SQL" dan masukkan pernyataan SQL dalam parameter data.
from alibabacloud_adb20211201.models import SubmitSparkAppRequest
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- Tambahkan pernyataan SQL Anda di bawah ini.
show databases;
"""
request = SubmitSparkAppRequest(
dbcluster_id="<cluster-id>",
resource_group_name="<resource-group-name>",
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Pekerjaan Spark SQL dikirim. App ID:", app_id)Kirim pekerjaan Spark JAR
Atur app_type ke "BATCH" dan masukkan konfigurasi pekerjaan sebagai string JSON dalam parameter data.
from alibabacloud_adb20211201.models import SubmitSparkAppRequest
import json
conf = {
"comments": [
"-- Ubah contoh ini untuk menjalankan program Spark Anda sendiri."
],
"args": ["1000"],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
request = SubmitSparkAppRequest(
dbcluster_id="<cluster-id>",
resource_group_name="<resource-group-name>",
data=json.dumps(conf),
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Pekerjaan Spark JAR dikirim. App ID:", app_id)Tanyakan status pekerjaan
Masukkan app_id yang dikembalikan saat Anda mengirim pekerjaan.
from alibabacloud_adb20211201.models import GetSparkAppStateRequest
request = GetSparkAppStateRequest(app_id="<app-id>")
response = client.get_spark_app_state(request)
state = response.body.data.state
print("Status pekerjaan:", state)Kueri log pekerjaan
from alibabacloud_adb20211201.models import GetSparkAppLogRequest
request = GetSparkAppLogRequest(app_id="<app-id>")
response = client.get_spark_app_log(request)
log_content = response.body.data.log_content
print("Log pekerjaan:", log_content)Hentikan pekerjaan
from alibabacloud_adb20211201.models import KillSparkAppRequest
request = KillSparkAppRequest(app_id="<app-id>")
response = client.kill_spark_app(request)
state = response.body.data.state
print("Status pekerjaan setelah dihentikan:", state)Tampilkan riwayat pekerjaan
from alibabacloud_adb20211201.models import ListSparkAppsRequest
request = ListSparkAppsRequest(
dbcluster_id="<cluster-id>",
page_number=1,
page_size=10
)
response = client.list_spark_apps(request)
print("Total pekerjaan:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print("App ID:", app_info.app_id)
print("State:", app_info.state)
print("Detail:", app_info.detail)Contoh lengkap
Skrip berikut menggabungkan semua operasi menjadi satu program yang dapat dijalankan.
import os
import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
SubmitSparkAppRequest,
GetSparkAppStateRequest,
GetSparkAppLogRequest,
KillSparkAppRequest,
ListSparkAppsRequest,
)
# Inisialisasi klien.
config = Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)
cluster_id = "<cluster-id>"
rg_name = "<resource-group-name>"
# Kirim pekerjaan Spark SQL.
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
show databases;
"""
sql_request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
sql_response = client.submit_spark_app(sql_request)
sql_app_id = sql_response.body.data.app_id
print("App ID pekerjaan SQL:", sql_app_id)
# Kirim pekerjaan Spark JAR.
jar_conf = {
"comments": [
"-- Ubah contoh ini untuk menjalankan program Spark Anda sendiri."
],
"args": ["1000"],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
jar_request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json.dumps(jar_conf),
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
jar_response = client.submit_spark_app(jar_request)
jar_app_id = jar_response.body.data.app_id
print("App ID pekerjaan JAR:", jar_app_id)
# Tanyakan status pekerjaan.
state_request = GetSparkAppStateRequest(app_id=sql_app_id)
state_response = client.get_spark_app_state(state_request)
print("Status pekerjaan SQL:", state_response.body.data.state)
# Tanyakan log pekerjaan.
log_request = GetSparkAppLogRequest(app_id=sql_app_id)
log_response = client.get_spark_app_log(log_request)
print("Log pekerjaan SQL:", log_response.body.data.log_content)
# Tampilkan riwayat pekerjaan.
list_request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=1,
page_size=10
)
list_response = client.list_spark_apps(list_request)
print("Total pekerjaan:", list_response.body.data.page_number)
for app_info in list_response.body.data.app_info_list:
print(app_info.app_id, app_info.state, app_info.detail)
# Hentikan pekerjaan JAR.
kill_request = KillSparkAppRequest(app_id=jar_app_id)
kill_response = client.kill_spark_app(kill_request)
print("Status pekerjaan JAR setelah dihentikan:", kill_response.body.data.state)Referensi API
Parameter SubmitSparkAppRequest
| Parameter | Tipe | Deskripsi |
|---|---|---|
dbcluster_id | String | ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0). |
resource_group_name | String | Nama kelompok sumber daya pekerjaan. |
data | String | Pernyataan SQL (untuk pekerjaan SQL) atau string konfigurasi JSON (untuk pekerjaan JAR). |
app_type | String | Tipe pekerjaan. Nilai yang valid: "SQL" (pekerjaan Spark SQL), "BATCH" (pekerjaan Spark JAR). |
agent_source | String | Sumber pengiriman. Atur ke "Python SDK". |
agent_version | String | Versi SDK. Atur ke "1.0.0". |
Parameter ListSparkAppsRequest
| Parameter | Tipe | Deskripsi |
|---|---|---|
dbcluster_id | String | ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0). |
page_number | Integer | Nomor halaman. Halaman dimulai dari 1. Nilai default: 1. |
page_size | Integer | Jumlah entri per halaman. |
Kelas dan metode SDK
| Operasi | Kelas permintaan | Metode klien | Bidang respons |
|---|---|---|---|
| Kirim pekerjaan | SubmitSparkAppRequest | client.submit_spark_app() | response.body.data.app_id |
| Tanyakan status pekerjaan | GetSparkAppStateRequest | client.get_spark_app_state() | response.body.data.state |
| Tanyakan log pekerjaan | GetSparkAppLogRequest | client.get_spark_app_log() | response.body.data.log_content |
| Hentikan pekerjaan | KillSparkAppRequest | client.kill_spark_app() | response.body.data.state |
| Tampilkan riwayat pekerjaan | ListSparkAppsRequest | client.list_spark_apps() | response.body.data.app_info_list |