AnalyticDB for MySQL SDK for Python lets you submit and manage Spark jobs programmatically without using the console. With the SDK, you can submit Spark SQL and JAR jobs, monitor job status, retrieve logs, terminate running jobs, and list job history.
Quick start
The following example submits a Spark SQL job, queries its status, and retrieves its logs.
import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
SubmitSparkAppRequest,
GetSparkAppStateRequest,
GetSparkAppLogRequest,
)
# Initialize the client.
config = Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
client = Client(config)
# Submit a Spark SQL job.
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
show databases;
"""
request = SubmitSparkAppRequest(
dbcluster_id="amv-bp1wo70f0k3c****",
resource_group_name="test",
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("App ID:", app_id)
# Query job status.
state_response = client.get_spark_app_state(GetSparkAppStateRequest(app_id=app_id))
print("State:", state_response.body.data.state)
# Retrieve job logs.
log_response = client.get_spark_app_log(GetSparkAppLogRequest(app_id=app_id))
print("Logs:", log_response.body.data.log_content)
Replace amv-bp1wo70f0k3c**** with your cluster ID and cn-hangzhou with the region where your cluster resides.
Prerequisites
Before you begin, make sure that you have:
-
Python 3.7 or later
-
An AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster
-
A job resource group for your cluster
-
AnalyticDB for MySQL SDK for Python installed
-
The
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETenvironment variables configured -
A path configured to store Spark job logs
NoteConfigure the log path using one of the following methods:
-
In the AnalyticDB for MySQL console, go to the Spark JAR Development page. In the upper-right corner, click Log Settings to set the path.
-
Set the
spark.app.log.rootPathparameter to an Object Storage Service (OSS) path.
-
Initialize the client
Create a Client instance to authenticate with Alibaba Cloud. All subsequent operations use this client.
import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
config = Config(
# Obtain the AccessKey ID from the environment variable.
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# Obtain the AccessKey secret from the environment variable.
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# Set the endpoint. Replace the region ID with your own.
endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)
Replace <region-id> with the ID of the region where your cluster resides, for example cn-hangzhou.
Submit a Spark SQL job
Set app_type to "SQL" and pass the SQL statements in the data parameter.
from alibabacloud_adb20211201.models import SubmitSparkAppRequest
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- Add your SQL statements below.
show databases;
"""
request = SubmitSparkAppRequest(
dbcluster_id="<cluster-id>",
resource_group_name="<resource-group-name>",
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark SQL job submitted. App ID:", app_id)
Submit a Spark JAR job
Set app_type to "BATCH" and pass the job configuration as a JSON string in the data parameter.
from alibabacloud_adb20211201.models import SubmitSparkAppRequest
import json
conf = {
"comments": [
"-- Modify this example to run your own Spark program."
],
"args": ["1000"],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
request = SubmitSparkAppRequest(
dbcluster_id="<cluster-id>",
resource_group_name="<resource-group-name>",
data=json.dumps(conf),
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark JAR job submitted. App ID:", app_id)
Query job status
Pass the app_id returned when you submitted the job.
from alibabacloud_adb20211201.models import GetSparkAppStateRequest
request = GetSparkAppStateRequest(app_id="<app-id>")
response = client.get_spark_app_state(request)
state = response.body.data.state
print("Job state:", state)
Query job logs
from alibabacloud_adb20211201.models import GetSparkAppLogRequest
request = GetSparkAppLogRequest(app_id="<app-id>")
response = client.get_spark_app_log(request)
log_content = response.body.data.log_content
print("Job logs:", log_content)
Terminate a job
from alibabacloud_adb20211201.models import KillSparkAppRequest
request = KillSparkAppRequest(app_id="<app-id>")
response = client.kill_spark_app(request)
state = response.body.data.state
print("Job state after termination:", state)
List job history
from alibabacloud_adb20211201.models import ListSparkAppsRequest
request = ListSparkAppsRequest(
dbcluster_id="<cluster-id>",
page_number=1,
page_size=10
)
response = client.list_spark_apps(request)
print("Total jobs:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print("App ID:", app_info.app_id)
print("State:", app_info.state)
print("Detail:", app_info.detail)
Complete example
The following script combines all operations into a single runnable program.
import os
import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
SubmitSparkAppRequest,
GetSparkAppStateRequest,
GetSparkAppLogRequest,
KillSparkAppRequest,
ListSparkAppsRequest,
)
# Initialize the client.
config = Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)
cluster_id = "<cluster-id>"
rg_name = "<resource-group-name>"
# Submit a Spark SQL job.
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
show databases;
"""
sql_request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
sql_response = client.submit_spark_app(sql_request)
sql_app_id = sql_response.body.data.app_id
print("SQL job App ID:", sql_app_id)
# Submit a Spark JAR job.
jar_conf = {
"comments": [
"-- Modify this example to run your own Spark program."
],
"args": ["1000"],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
jar_request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json.dumps(jar_conf),
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
jar_response = client.submit_spark_app(jar_request)
jar_app_id = jar_response.body.data.app_id
print("JAR job App ID:", jar_app_id)
# Query job status.
state_request = GetSparkAppStateRequest(app_id=sql_app_id)
state_response = client.get_spark_app_state(state_request)
print("SQL job state:", state_response.body.data.state)
# Query job logs.
log_request = GetSparkAppLogRequest(app_id=sql_app_id)
log_response = client.get_spark_app_log(log_request)
print("SQL job logs:", log_response.body.data.log_content)
# List job history.
list_request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=1,
page_size=10
)
list_response = client.list_spark_apps(list_request)
print("Total jobs:", list_response.body.data.page_number)
for app_info in list_response.body.data.app_info_list:
print(app_info.app_id, app_info.state, app_info.detail)
# Terminate the JAR job.
kill_request = KillSparkAppRequest(app_id=jar_app_id)
kill_response = client.kill_spark_app(kill_request)
print("JAR job state after termination:", kill_response.body.data.state)
API reference
SubmitSparkAppRequest parameters
| Parameter | Type | Description |
|---|---|---|
dbcluster_id |
String | The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. |
resource_group_name |
String | The name of the job resource group. |
data |
String | The SQL statements (for SQL jobs) or JSON configuration string (for JAR jobs). |
app_type |
String | The job type. Valid values: "SQL" (Spark SQL job), "BATCH" (Spark JAR job). |
agent_source |
String | The source of the submission. Set to "Python SDK". |
agent_version |
String | The SDK version. Set to "1.0.0". |
ListSparkAppsRequest parameters
| Parameter | Type | Description |
|---|---|---|
dbcluster_id |
String | The ID of the AnalyticDB for MySQL Data Lakehouse Edition (V3.0) cluster. |
page_number |
Integer | The page number. Pages start from 1. Default value: 1. |
page_size |
Integer | The number of entries per page. |
SDK classes and methods
| Operation | Request class | Client method | Response field |
|---|---|---|---|
| Submit a job | SubmitSparkAppRequest |
client.submit_spark_app() |
response.body.data.app_id |
| Query job status | GetSparkAppStateRequest |
client.get_spark_app_state() |
response.body.data.state |
| Query job logs | GetSparkAppLogRequest |
client.get_spark_app_log() |
response.body.data.log_content |
| Terminate a job | KillSparkAppRequest |
client.kill_spark_app() |
response.body.data.state |
| List job history | ListSparkAppsRequest |
client.list_spark_apps() |
response.body.data.app_info_list |