すべてのプロダクト
Search
ドキュメントセンター

AnalyticDB:AnalyticDB for MySQL Python SDK を使用した Spark アプリケーションの開発

最終更新日:Mar 01, 2026

AnalyticDB for MySQL Python SDK を使用すると、コンソールを使用せずにプログラムで Spark ジョブを送信および管理できます。この SDK を使用して、Spark SQL および JAR ジョブの送信、ジョブステータスの監視、ログの取得、実行中のジョブの終了、ジョブ履歴の一覧表示が可能です。

クイックスタート

次の例では、Spark SQL ジョブを送信し、そのステータスをクエリし、ログを取得します。

import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
    SubmitSparkAppRequest,
    GetSparkAppStateRequest,
    GetSparkAppLogRequest,
)

# クライアントを初期化します。
config = Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    endpoint="adb.cn-hangzhou.aliyuncs.com"
)
client = Client(config)

# Spark SQL ジョブを送信します。
sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    show databases;
"""

request = SubmitSparkAppRequest(
    dbcluster_id="amv-bp1wo70f0k3c****",
    resource_group_name="test",
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("App ID:", app_id)

# ジョブステータスをクエリします。
state_response = client.get_spark_app_state(GetSparkAppStateRequest(app_id=app_id))
print("State:", state_response.body.data.state)

# ジョブログを取得します。
log_response = client.get_spark_app_log(GetSparkAppLogRequest(app_id=app_id))
print("Logs:", log_response.body.data.log_content)

amv-bp1wo70f0k3c**** をご利用のクラスター ID に、cn-hangzhou をクラスターが存在するリージョンに置き換えてください。

前提条件

開始する前に、以下をご確認ください。

  • Python 3.7 以降

  • AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスター

  • お使いのクラスター向けのジョブ リソースグループ

  • AnalyticDB for MySQL Python SDK がインストール済みであること

  • ALIBABA_CLOUD_ACCESS_KEY_ID および ALIBABA_CLOUD_ACCESS_KEY_SECRET 環境変数が設定済みであること

  • Spark ジョブログを保存するためのパスが設定済みであること

    説明

    ログパスは、次のいずれかの方法で設定します。

    • AnalyticDB for MySQL コンソールで、[Spark JAR 開発] ページに移動します。右上隅の [ログ設定] をクリックしてパスを設定します。

    • spark.app.log.rootPath パラメーターを Object Storage Service (OSS) パスに設定します。

クライアントの初期化

Client インスタンスを作成して Alibaba Cloud で認証します。後続のすべての操作でこのクライアントを使用します。

import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client

config = Config(
    # 環境変数から AccessKey ID を取得します。
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    # 環境変数から AccessKey Secret を取得します。
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    # エンドポイントを設定します。リージョン ID をご自身のものに置き換えてください。
    endpoint="adb.<region-id>.aliyuncs.com"
)

client = Client(config)

<region-id> を、ご利用のクラスターが存在するリージョンの ID (例: cn-hangzhou) に置き換えてください。

Spark SQL ジョブの送信

app_type"SQL" に設定し、data パラメーターに SQL 文を渡します。

from alibabacloud_adb20211201.models import SubmitSparkAppRequest

sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    -- 以下に SQL 文を追加します。
    show databases;
"""

request = SubmitSparkAppRequest(
    dbcluster_id="<cluster-id>",
    resource_group_name="<resource-group-name>",
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)

response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark SQL job submitted. App ID:", app_id)

Spark JAR ジョブの送信

app_type"BATCH" に設定し、ジョブ構成を JSON 文字列として data パラメーターに渡します。

from alibabacloud_adb20211201.models import SubmitSparkAppRequest
import json

conf = {
    "comments": [
        "-- この例を変更して、独自の Spark プログラムを実行します。"
    ],
    "args": ["1000"],
    "file": "local:///tmp/spark-examples.jar",
    "name": "SparkPi",
    "className": "org.apache.spark.examples.SparkPi",
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "medium"
    }
}

request = SubmitSparkAppRequest(
    dbcluster_id="<cluster-id>",
    resource_group_name="<resource-group-name>",
    data=json.dumps(conf),
    app_type="BATCH",
    agent_source="Python SDK",
    agent_version="1.0.0"
)

response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark JAR job submitted. App ID:", app_id)

ジョブステータスのクエリ

ジョブの送信時に返された app_id を渡します。

from alibabacloud_adb20211201.models import GetSparkAppStateRequest

request = GetSparkAppStateRequest(app_id="<app-id>")
response = client.get_spark_app_state(request)
state = response.body.data.state
print("Job state:", state)

ジョブのログのクエリ

from alibabacloud_adb20211201.models import GetSparkAppLogRequest

request = GetSparkAppLogRequest(app_id="<app-id>")
response = client.get_spark_app_log(request)
log_content = response.body.data.log_content
print("Job logs:", log_content)

ジョブの終了

from alibabacloud_adb20211201.models import KillSparkAppRequest

request = KillSparkAppRequest(app_id="<app-id>")
response = client.kill_spark_app(request)
state = response.body.data.state
print("Job state after termination:", state)

ジョブ履歴の一覧表示

from alibabacloud_adb20211201.models import ListSparkAppsRequest

request = ListSparkAppsRequest(
    dbcluster_id="<cluster-id>",
    page_number=1,
    page_size=10
)

response = client.list_spark_apps(request)
print("Total jobs:", response.body.data.page_number)

for app_info in response.body.data.app_info_list:
    print("App ID:", app_info.app_id)
    print("State:", app_info.state)
    print("Detail:", app_info.detail)

完全な例

次のスクリプトは、すべての操作を単一の実行可能なプログラムにまとめたものです。

import os
import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
    SubmitSparkAppRequest,
    GetSparkAppStateRequest,
    GetSparkAppLogRequest,
    KillSparkAppRequest,
    ListSparkAppsRequest,
)

# クライアントを初期化します。
config = Config(
    access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
    access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
    endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)

cluster_id = "<cluster-id>"
rg_name = "<resource-group-name>"

# Spark SQL ジョブを送信します。
sql = """
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=2;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=Spark SQL Test;
    show databases;
"""

sql_request = SubmitSparkAppRequest(
    dbcluster_id=cluster_id,
    resource_group_name=rg_name,
    data=sql,
    app_type="SQL",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
sql_response = client.submit_spark_app(sql_request)
sql_app_id = sql_response.body.data.app_id
print("SQL job App ID:", sql_app_id)

# Spark JAR ジョブを送信します。
jar_conf = {
    "comments": [
        "-- この例を変更して、独自の Spark プログラムを実行します。"
    ],
    "args": ["1000"],
    "file": "local:///tmp/spark-examples.jar",
    "name": "SparkPi",
    "className": "org.apache.spark.examples.SparkPi",
    "conf": {
        "spark.driver.resourceSpec": "medium",
        "spark.executor.instances": 2,
        "spark.executor.resourceSpec": "medium"
    }
}

jar_request = SubmitSparkAppRequest(
    dbcluster_id=cluster_id,
    resource_group_name=rg_name,
    data=json.dumps(jar_conf),
    app_type="BATCH",
    agent_source="Python SDK",
    agent_version="1.0.0"
)
jar_response = client.submit_spark_app(jar_request)
jar_app_id = jar_response.body.data.app_id
print("JAR job App ID:", jar_app_id)

# ジョブステータスをクエリします。
state_request = GetSparkAppStateRequest(app_id=sql_app_id)
state_response = client.get_spark_app_state(state_request)
print("SQL job state:", state_response.body.data.state)

# ジョブログをクエリします。
log_request = GetSparkAppLogRequest(app_id=sql_app_id)
log_response = client.get_spark_app_log(log_request)
print("SQL job logs:", log_response.body.data.log_content)

# ジョブ履歴を一覧表示します。
list_request = ListSparkAppsRequest(
    dbcluster_id=cluster_id,
    page_number=1,
    page_size=10
)
list_response = client.list_spark_apps(list_request)
print("Total jobs:", list_response.body.data.page_number)
for app_info in list_response.body.data.app_info_list:
    print(app_info.app_id, app_info.state, app_info.detail)

# JAR ジョブを終了します。
kill_request = KillSparkAppRequest(app_id=jar_app_id)
kill_response = client.kill_spark_app(kill_request)
print("JAR job state after termination:", kill_response.body.data.state)

API リファレンス

SubmitSparkAppRequest のパラメーター

パラメーター説明
dbcluster_idStringAnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターの ID。
resource_group_nameStringジョブリソースグループの名前。
dataStringSQL 文 (SQL ジョブの場合) または JSON 構成文字列 (JAR ジョブの場合)。
app_typeStringジョブタイプ。有効な値: "SQL" (Spark SQL ジョブ)、"BATCH" (Spark JAR ジョブ)。
agent_sourceString送信元。 "Python SDK" に設定します。
agent_versionStringSDK のバージョン。 "1.0.0" に設定します。

ListSparkAppsRequest のパラメーター

パラメーター説明
dbcluster_idStringAnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターの ID。
page_numberIntegerページ番号。ページは 1 から始まります。デフォルト値: 1。
page_sizeIntegerページあたりのエントリ数。

SDK のクラスとメソッド

操作リクエストクラスクライアントメソッド応答フィールド
ジョブの送信SubmitSparkAppRequestclient.submit_spark_app()response.body.data.app_id
ジョブステータスのクエリGetSparkAppStateRequestclient.get_spark_app_state()response.body.data.state
クエリ ジョブのログGetSparkAppLogRequestclient.get_spark_app_log()response.body.data.log_content
ジョブの終了KillSparkAppRequestclient.kill_spark_app()response.body.data.state
ジョブ履歴の一覧表示ListSparkAppsRequestclient.list_spark_apps()response.body.data.app_info_list