全部产品
Search
文档中心

AnalyticDB:Gunakan AnalyticDB for MySQL SDK untuk Python untuk mengembangkan aplikasi Spark

更新时间:Jul 02, 2025

Topik ini menjelaskan cara menggunakan AnalyticDB for MySQL SDK untuk Python untuk mengirimkan tugas Spark, memeriksa status dan log tugas Spark, menghentikan tugas Spark, serta memeriksa tugas Spark historis.

Prasyarat

  • Python 3.7 atau yang lebih baru telah diinstal.

  • Sebuah kluster Data Lakehouse Edition (V3.0) AnalyticDB for MySQL telah dibuat. Untuk informasi lebih lanjut, lihat Buat Kluster Data Lakehouse Edition.

  • Grup sumber daya tugas telah dibuat untuk kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0). Untuk informasi lebih lanjut, lihat Buat Grup Sumber Daya.

  • AnalyticDB for MySQL SDK untuk Python telah diinstal. Untuk informasi lebih lanjut, lihat AnalyticDB for MySQL SDK untuk Python.

  • Variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID dan ALIBABA_CLOUD_ACCESS_KEY_SECRET telah dikonfigurasi. Untuk informasi lebih lanjut, lihat Konfigurasikan Variabel Lingkungan di Linux, macOS, dan Windows.

  • Path untuk menyimpan log tugas Spark telah dikonfigurasi.

    Catatan

    Anda dapat menggunakan salah satu metode berikut untuk mengonfigurasi path log:

    • Masuk ke konsol AnalyticDB for MySQL dan buka halaman Spark JAR Development. Di pojok kanan atas halaman, klik Log Settings untuk mengonfigurasi path log.

    • Gunakan parameter spark.app.log.rootPath untuk menentukan path Object Storage Service (OSS) guna menyimpan log tugas Spark.

Kode contoh

Berikut adalah kode contoh tentang cara menggunakan AnalyticDB for MySQL SDK untuk Python untuk mengirimkan tugas Spark, memeriksa status dan log tugas Spark, menghentikan tugas Spark, serta memeriksa tugas Spark historis:

from alibabacloud_adb20211201.models import SubmitSparkAppRequest, SubmitSparkAppResponse, GetSparkAppStateRequest, \
    GetSparkAppStateResponse, GetSparkAppLogResponse, GetSparkAppLogRequest, KillSparkAppRequest, \
    KillSparkAppResponse, ListSparkAppsRequest, ListSparkAppsResponse
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

import os


def submit_spark_sql(client: Client, cluster_id, rg_name, sql):
    """
    Kirim tugas Spark SQL

    :param client:             Klien Alibaba Cloud.
    :param cluster_id:         ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
    :param rg_name:            Nama grup sumber daya.
    :param sql:                SQL
    :return:                   ID tugas Spark.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Inisialisasi permintaan.
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=sql,
        app_type="SQL",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # Kirim pernyataan SQL untuk mendapatkan hasilnya.
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # Dapatkan ID tugas Spark.
    print(response)
    return response.body.data.app_id


def submit_spark_jar(client: Client, cluster_id: str, rg_name: str, json_conf: str):
    """
    Kirim tugas Spark

    :param client:             Klien Alibaba Cloud.
    :param cluster_id:         ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
    :param rg_name:            Nama grup sumber daya.
    :param json_conf:          Konfigurasi JSON.
    :return:                   ID tugas Spark.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Inisialisasi permintaan.
    request = SubmitSparkAppRequest(
        dbcluster_id=cluster_id,
        resource_group_name=rg_name,
        data=json_conf,
        app_type="BATCH",
        agent_source="Python SDK",
        agent_version="1.0.0"
    )

    # Kirim pernyataan SQL untuk mendapatkan hasilnya.
    response: SubmitSparkAppResponse = client.submit_spark_app(request)
    # Dapatkan ID tugas Spark.
    print(response)
    return response.body.data.app_id


def get_status(client: Client, app_id):
    """
    Periksa status tugas Spark

    :param client:             Klien Alibaba Cloud.
    :param app_id:             ID tugas Spark.
    :return:                   Status tugas Spark.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Inisialisasi permintaan.
    print(app_id)
    request = GetSparkAppStateRequest(app_id=app_id)
    # Dapatkan status tugas Spark.
    response: GetSparkAppStateResponse = client.get_spark_app_state(request)
    print(response)
    return response.body.data.state


def get_log(client: Client, app_id):
    """
    Periksa log tugas Spark

    :param client:             Klien Alibaba Cloud.
    :param app_id:             ID tugas Spark.
    :return:                   Log tugas Spark.
    :rtype:                    basestring
    :exception                 ClientException
    """

    # Inisialisasi permintaan.
    request = GetSparkAppLogRequest(app_id=app_id)

    # Dapatkan log tugas Spark.
    response: GetSparkAppLogResponse = client.get_spark_app_log(request)
    print(response)
    return response.body.data.log_content


def kill_app(client: Client, app_id):
    """
    Hentikan tugas Spark

    :param client:             Klien Alibaba Cloud.
    :param app_id:             ID tugas Spark.
    :return:                   Status tugas Spark.
    :exception                 ClientException
    """

    # Inisialisasi permintaan.
    request = KillSparkAppRequest(app_id=app_id)

    # Dapatkan status tugas Spark.
    response: KillSparkAppResponse = client.kill_spark_app(request)
    print(response)
    return response.body.data.state


def list_apps(client: Client, cluster_id: str, page_number: int, page_size: int):
    """
    Periksa tugas Spark historis

    :param client:             Klien Alibaba Cloud.
    :param cluster_id:         ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
    :param page_number:        Nomor halaman. Halaman dimulai dari halaman 1. Nilai default: 1.
    :param page_size           Jumlah entri per halaman.
    :return:                   Detail tugas Spark.
    :exception                 ClientException
    """

    # Inisialisasi permintaan.
    request = ListSparkAppsRequest(
        dbcluster_id=cluster_id,
        page_number=page_number,
        page_size=page_size
    )

    # Dapatkan detail tugas Spark.
    response: ListSparkAppsResponse = client.list_spark_apps(request)
    print("Total App Number:", response.body.data.page_number)
    for app_info in response.body.data.app_info_list:
        print(app_info.app_id)
        print(app_info.state)
        print(app_info.detail)


if __name__ == '__main__':
    # konfigurasi klien
    config = Config(
        # Dapatkan AccessKey ID dari variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_ID.
        access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
        # Dapatkan Rahasia AccessKey dari variabel lingkungan ALIBABA_CLOUD_ACCESS_KEY_SECRET.
        access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
        # Dapatkan Titik akhir. cn-hangzhou menunjukkan ID wilayah tempat kluster berada.
        endpoint="adb.cn-hangzhou.aliyuncs.com"
    )

    # klien baru
    adb_client = Client(config)

    sql_str = """
        -- Ini hanya contoh SparkSQL. Ubah konten dan jalankan program spark Anda.
        set spark.driver.resourceSpec=medium;
        set spark.executor.instances=2;
        set spark.executor.resourceSpec=medium;
        set spark.app.name=Spark SQL Test;
        -- Berikut adalah pernyataan SQL Anda
        show databases;
    """

    json_str = """
    {
        "comments": [
            "-- Ini hanya contoh SparkPi. Ubah konten dan jalankan program spark Anda."
        ],
        "args": [
            "1000"
        ],
        "file": "local:///tmp/spark-examples.jar",
        "name": "SparkPi",
        "className": "org.apache.spark.examples.SparkPi",
        "conf": {
            "spark.driver.resourceSpec": "medium",
            "spark.executor.instances": 2,
            "spark.executor.resourceSpec": "medium"
        }
    }
    """
    """
    Kirim tugas Spark SQL

    cluster_id:    ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
    rg_name:       Nama grup sumber daya.
    """

    sql_app_id = submit_spark_sql(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", rg_name="test", sql=sql_str)
    print(sql_app_id)

    """
    Kirim tugas Spark

    cluster_id:    ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).
    rg_name:       Nama grup sumber daya.
    """

    json_app_id = submit_spark_jar(client=adb_client, cluster_id="amv-bp1wo70f0k3c****",
                                   rg_name="test", json_conf=json_str)
    print(json_app_id)

    # Periksa status tugas Spark.
    get_status(client=adb_client, app_id=sql_app_id)
    get_status(client=adb_client, app_id=json_app_id)

    """
    Periksa tugas Spark historis
    cluster_id:      ID kluster AnalyticDB for MySQL Data Lakehouse Edition (V3.0).     
    page_number:     Nomor halaman. Halaman dimulai dari halaman 1. Nilai default: 1.
    page_size:       Jumlah entri per halaman.
    """

    list_apps(client=adb_client, cluster_id="amv-bp1wo70f0k3c****", page_size=10, page_number=1)

    # Hentikan tugas Spark.
    kill_app(client=adb_client, app_id=json_app_id)