Topik ini menjelaskan cara menggunakan AnalyticDB for MySQL SDK untuk Python untuk mengirimkan tugas Spark, memeriksa status dan log tugas Spark, menghentikan tugas Spark, serta memeriksa tugas Spark historis.
Prasyarat
Python 3.7 atau yang lebih baru telah diinstal.
Sebuah kluster Data Lakehouse Edition (V3.0) AnalyticDB for MySQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat Kluster Data Lakehouse Edition.
Grup sumber daya tugas telah dibuat untuk kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0). Untuk informasi lebih lanjut, lihat Buat Grup Sumber Daya.
AnalyticDB for MySQL SDK untuk Python telah diinstal. Untuk informasi lebih lanjut, lihat AnalyticDB for MySQL SDK untuk Python.
Variabel lingkungan
ALIBABA_CLOUD_ACCESS_KEY_IDdanALIBABA_CLOUD_ACCESS_KEY_SECRETtelah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Variabel Lingkungan di Linux, macOS, dan Windows.Path untuk menyimpan log tugas Spark telah dikonfigurasi.
CatatanAnda dapat menggunakan salah satu metode berikut untuk mengonfigurasi path log:
Masuk ke konsol AnalyticDB for MySQL dan buka halaman Spark JAR Development. Di pojok kanan atas halaman, klik Log Settings untuk mengonfigurasi path log.
Gunakan parameter
spark.app.log.rootPathuntuk menentukan path Object Storage Service (OSS) guna menyimpan log tugas Spark.
Kode contoh
Berikut adalah kode contoh tentang cara menggunakan AnalyticDB for MySQL SDK untuk Python untuk mengirimkan tugas Spark, memeriksa status dan log tugas Spark, menghentikan tugas Spark, serta memeriksa tugas Spark historis:
from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
import os
def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
"""
Kirim tugas Spark SQL
:param client: Klien Alibaba Cloud.
:param cluster_id: ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
:param rg_name: Nama grup sumber daya.
:param sql: SQL
:return: ID tugas Spark.
:rtype: basestring
:exception ClientException
"""
# Inisialisasi permintaan.
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
# Kirim pernyataan SQL untuk mendapatkan hasilnya.
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# Dapatkan ID tugas Spark.
print(response)
return response.body.data.app_id
def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
"""
Kirim tugas Spark
:param client: Klien Alibaba Cloud.
:param cluster_id: ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
:param rg_name: Nama grup sumber daya.
:param json_conf: Konfigurasi JSON.
:return: ID tugas Spark.
:rtype: basestring
:exception ClientException
"""
# Inisialisasi permintaan.
request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json_conf,
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
# Kirim pernyataan SQL untuk mendapatkan hasilnya.
response: SubmitSparkAppResponse = client.submit_spark_app(request)
# Dapatkan ID tugas Spark.
print(response)
return response.body.data.app_id
def get_status(client: Client, app_id):
"""
Periksa status tugas Spark
:param client: Klien Alibaba Cloud.
:param app_id: ID tugas Spark.
:return: Status tugas Spark.
:rtype: basestring
:exception ClientException
"""
# Inisialisasi permintaan.
print(app_id)
request = GetSparkAppStateRequest(app_id=app_id)
# Dapatkan status tugas Spark.
response: GetSparkAppStateResponse = client.get_spark_app_state(request)
print(response)
return response.body.data.state
def get_log(client: Client, app_id):
"""
Periksa log tugas Spark
:param client: Klien Alibaba Cloud.
:param app_id: ID tugas Spark.
:return: Log tugas Spark.
:rtype: basestring
:exception ClientException
"""
# Inisialisasi permintaan.
request = GetSparkAppLogRequest(app_id=app_id)
# Dapatkan log tugas Spark.
response: GetSparkAppLogResponse = client.get_spark_app_log(request)
print(response)
return response.body.data.log_content
def kill_app(client: Client, app_id):
"""
Hentikan tugas Spark
:param client: Klien Alibaba Cloud.
:param app_id: ID tugas Spark.
:return: Status tugas Spark.
:exception ClientException
"""
# Inisialisasi permintaan.
request = KillSparkAppRequest(app_id=app_id)
# Dapatkan status tugas Spark.
response: KillSparkAppResponse = client.kill_spark_app(request)
print(response)
return response.body.data.state
def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
"""
Periksa tugas Spark historis
:param client: Klien Alibaba Cloud.
:param cluster_id: ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
:param page_number: Nomor halaman. Halaman dimulai dari halaman 1. Nilai default: 1.
:param page_size Jumlah entri per halaman.
:return: Detail tugas Spark.
:exception ClientException
"""
# Inisialisasi permintaan.
request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=page_number,
page_size=page_size
)
# Dapatkan detail tugas Spark.
response: ListSparkAppsResponse = client.list_spark_apps(request)
print("Total App Number:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print(app_info.app_id)
print(app_info.state)
print(app_info.detail)
if __name__ == '__main__':
# konfigurasi klien
config = Config(
# Dapatkan AccessKey ID dari variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Dapatkan Rahasia AccessKey dari variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# Dapatkan Titik akhir. cn-hangzhou menunjukkan ID wilayah tempat kluster berada.
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
# klien baru
adb_client = Client(config)
sql_str = """
-- Ini hanya contoh SparkSQL. Ubah konten dan jalankan program spark Anda.
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- Berikut adalah pernyataan SQL Anda
show databases;
"""
json_str = """
{
"comments": [
"-- Ini hanya contoh SparkPi. Ubah konten dan jalankan program spark Anda."
],
"args": [
"1000"
],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
"""
"""
Kirim tugas Spark SQL
cluster_id: ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
rg_name: Nama grup sumber daya.
"""
sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
print(sql_app_id)
"""
Kirim tugas Spark
cluster_id: ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
rg_name: Nama grup sumber daya.
"""
json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
rg_name="test", json_conf=json_str)
print(json_app_id)
# Periksa status tugas Spark.
get_status(client=adb_client, app_id=sql_app_id)
get_status(client=adb_client, app_id=json_app_id)
"""
Periksa tugas Spark historis
cluster_id: ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
page_number: Nomor halaman. Halaman dimulai dari halaman 1. Nilai default: 1.
page_size: Jumlah entri per halaman.
"""
list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)
# Hentikan tugas Spark.
kill_app(client=adb_client, app_id=json_app_id)