All Products
Search
Document Center

AnalyticDB:Develop Spark applications with AnalyticDB for MySQL SDK for Python

Last Updated:Mar 24, 2026

AnalyticDB for MySQL SDK for Python lets you submit and manage Spark jobs programmatically without using the console. With the SDK, you can submit Spark SQL and JAR jobs, monitor job status, retrieve logs, terminate running jobs, and list job history.

Quick start

The following example submits a Spark SQL job, queries its status, and retrieves its logs.

import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
    SubmitSparkAppRequest,
    GetSparkAppStateRequest,
    GetSparkAppLogRequest,
)

# Initialize the client.
config = Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    endpoint="adb.cn-hangzhou.aliyuncs.com"
)
client = Client(config)

# Submit a Spark SQL job.
sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    show databases;
"""

request = SubmitSparkAppRequest(
    dbcluster_id="amv-bp1wo70f0k3c****",
    resource_group_name="test",
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("App ID:", app_id)

# Query job status.
state_response = client.get_spark_app_state(GetSparkAppStateRequest(app_id=app_id))
print("State:", state_response.body.data.state)

# Retrieve job logs.
log_response = client.get_spark_app_log(GetSparkAppLogRequest(app_id=app_id))
print("Logs:", log_response.body.data.log_content)

Replace amv-bp1wo70f0k3c**** with your cluster ID and cn-hangzhou with the region where your cluster resides.

Prerequisites

Before you begin, make sure that you have:

Initialize the client

Create a Client instance to authenticate with Alibaba Cloud. All subsequent operations use this client.

import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

config = Config(
    # Obtain the AccessKey ID from the environment variable.
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    # Obtain the AccessKey secret from the environment variable.
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    # Set the endpoint. Replace the region ID with your own.
    endpoint="adb.<region-id>.aliyuncs.com"
)

client = Client(config)

Replace <region-id> with the ID of the region where your cluster resides, for example cn-hangzhou.

Submit a Spark SQL job

Set app_type to "SQL" and pass the SQL statements in the data parameter.

from alibabacloud_adb20211201.models import SubmitSparkAppRequest

sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    -- Add your SQL statements below.
    show databases;
"""

request = SubmitSparkAppRequest(
    dbcluster_id="<cluster-id>",
    resource_group_name="<resource-group-name>",
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)

response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark SQL job submitted. App ID:", app_id)

Submit a Spark JAR job

Set app_type to "BATCH" and pass the job configuration as a JSON string in the data parameter.

from alibabacloud_adb20211201.models import SubmitSparkAppRequest
import json

conf = {
    "comments": [
        "-- Modify this example to run your own Spark program."
    ],
    "args": ["1000"],
    "file": "local:///tmp/spark-examples.jar",
    "name": "SparkPi",
    "className": "org.apache.spark.examples.SparkPi",
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "medium"
    }
}

request = SubmitSparkAppRequest(
    dbcluster_id="<cluster-id>",
    resource_group_name="<resource-group-name>",
    data=json.dumps(conf),
    app_type="BATCH",
    agent_source="Python SDK",
    agent_version="1.0.0"
)

response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark JAR job submitted. App ID:", app_id)

Query job status

Pass the app_id returned when you submitted the job.

from alibabacloud_adb20211201.models import GetSparkAppStateRequest

request = GetSparkAppStateRequest(app_id="<app-id>")
response = client.get_spark_app_state(request)
state = response.body.data.state
print("Job state:", state)

Query job logs

from alibabacloud_adb20211201.models import GetSparkAppLogRequest

request = GetSparkAppLogRequest(app_id="<app-id>")
response = client.get_spark_app_log(request)
log_content = response.body.data.log_content
print("Job logs:", log_content)

Terminate a job

from alibabacloud_adb20211201.models import KillSparkAppRequest

request = KillSparkAppRequest(app_id="<app-id>")
response = client.kill_spark_app(request)
state = response.body.data.state
print("Job state after termination:", state)

List job history

from alibabacloud_adb20211201.models import ListSparkAppsRequest

request = ListSparkAppsRequest(
    dbcluster_id="<cluster-id>",
    page_number=1,
    page_size=10
)

response = client.list_spark_apps(request)
print("Total jobs:", response.body.data.page_number)

for app_info in response.body.data.app_info_list:
    print("App ID:", app_info.app_id)
    print("State:", app_info.state)
    print("Detail:", app_info.detail)

Complete example

The following script combines all operations into a single runnable program.

import os
import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
    SubmitSparkAppRequest,
    GetSparkAppStateRequest,
    GetSparkAppLogRequest,
    KillSparkAppRequest,
    ListSparkAppsRequest,
)

# Initialize the client.
config = Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)

cluster_id = "<cluster-id>"
rg_name = "<resource-group-name>"

# Submit a Spark SQL job.
sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    show databases;
"""

sql_request = SubmitSparkAppRequest(
    dbcluster_id=cluster_id,
    resource_group_name=rg_name,
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
sql_response = client.submit_spark_app(sql_request)
sql_app_id = sql_response.body.data.app_id
print("SQL job App ID:", sql_app_id)

# Submit a Spark JAR job.
jar_conf = {
    "comments": [
        "-- Modify this example to run your own Spark program."
    ],
    "args": ["1000"],
    "file": "local:///tmp/spark-examples.jar",
    "name": "SparkPi",
    "className": "org.apache.spark.examples.SparkPi",
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "medium"
    }
}

jar_request = SubmitSparkAppRequest(
    dbcluster_id=cluster_id,
    resource_group_name=rg_name,
    data=json.dumps(jar_conf),
    app_type="BATCH",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
jar_response = client.submit_spark_app(jar_request)
jar_app_id = jar_response.body.data.app_id
print("JAR job App ID:", jar_app_id)

# Query job status.
state_request = GetSparkAppStateRequest(app_id=sql_app_id)
state_response = client.get_spark_app_state(state_request)
print("SQL job state:", state_response.body.data.state)

# Query job logs.
log_request = GetSparkAppLogRequest(app_id=sql_app_id)
log_response = client.get_spark_app_log(log_request)
print("SQL job logs:", log_response.body.data.log_content)

# List job history.
list_request = ListSparkAppsRequest(
    dbcluster_id=cluster_id,
    page_number=1,
    page_size=10
)
list_response = client.list_spark_apps(list_request)
print("Total jobs:", list_response.body.data.page_number)
for app_info in list_response.body.data.app_info_list:
    print(app_info.app_id, app_info.state, app_info.detail)

# Terminate the JAR job.
kill_request = KillSparkAppRequest(app_id=jar_app_id)
kill_response = client.kill_spark_app(kill_request)
print("JAR job state after termination:", kill_response.body.data.state)

API reference

SubmitSparkAppRequest parameters

Parameter Type Description
dbcluster_id String The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
resource_group_name String The name of the job resource group.
data String The SQL statements (for SQL jobs) or JSON configuration string (for JAR jobs).
app_type String The job type. Valid values: "SQL" (Spark SQL job), "BATCH" (Spark JAR job).
agent_source String The source of the submission. Set to "Python SDK".
agent_version String The SDK version. Set to "1.0.0".

ListSparkAppsRequest parameters

Parameter Type Description
dbcluster_id String The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster.
page_number Integer The page number. Pages start from 1. Default value: 1.
page_size Integer The number of entries per page.

SDK classes and methods

Operation Request class Client method Response field
Submit a job SubmitSparkAppRequest client.submit_spark_app() response.body.data.app_id
Query job status GetSparkAppStateRequest client.get_spark_app_state() response.body.data.state
Query job logs GetSparkAppLogRequest client.get_spark_app_log() response.body.data.log_content
Terminate a job KillSparkAppRequest client.kill_spark_app() response.body.data.state
List job history ListSparkAppsRequest client.list_spark_apps() response.body.data.app_info_list