AnalyticDB for MySQL Python SDK を使用すると、コンソールを使用せずにプログラムで Spark ジョブを送信および管理できます。この SDK を使用して、Spark SQL および JAR ジョブの送信、ジョブステータスの監視、ログの取得、実行中のジョブの終了、ジョブ履歴の一覧表示が可能です。
クイックスタート
次の例では、Spark SQL ジョブを送信し、そのステータスをクエリし、ログを取得します。
import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
SubmitSparkAppRequest,
GetSparkAppStateRequest,
GetSparkAppLogRequest,
)
# クライアントを初期化します。
config = Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
endpoint="adb.cn-hangzhou.aliyuncs.com"
)
client = Client(config)
# Spark SQL ジョブを送信します。
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
show databases;
"""
request = SubmitSparkAppRequest(
dbcluster_id="amv-bp1wo70f0k3c****",
resource_group_name="test",
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("App ID:", app_id)
# ジョブステータスをクエリします。
state_response = client.get_spark_app_state(GetSparkAppStateRequest(app_id=app_id))
print("State:", state_response.body.data.state)
# ジョブログを取得します。
log_response = client.get_spark_app_log(GetSparkAppLogRequest(app_id=app_id))
print("Logs:", log_response.body.data.log_content)amv-bp1wo70f0k3c**** をご利用のクラスター ID に、cn-hangzhou をクラスターが存在するリージョンに置き換えてください。
前提条件
開始する前に、以下をご確認ください。
Python 3.7 以降
お使いのクラスター向けのジョブ リソースグループ
AnalyticDB for MySQL Python SDK がインストール済みであること
ALIBABA_CLOUD_ACCESS_KEY_IDおよびALIBABA_CLOUD_ACCESS_KEY_SECRET環境変数が設定済みであることSpark ジョブログを保存するためのパスが設定済みであること
説明ログパスは、次のいずれかの方法で設定します。
AnalyticDB for MySQL コンソールで、[Spark JAR 開発] ページに移動します。右上隅の [ログ設定] をクリックしてパスを設定します。
spark.app.log.rootPathパラメーターを Object Storage Service (OSS) パスに設定します。
クライアントの初期化
Client インスタンスを作成して Alibaba Cloud で認証します。後続のすべての操作でこのクライアントを使用します。
import os
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
config = Config(
# 環境変数から AccessKey ID を取得します。
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
# 環境変数から AccessKey Secret を取得します。
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
# エンドポイントを設定します。リージョン ID をご自身のものに置き換えてください。
endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)<region-id> を、ご利用のクラスターが存在するリージョンの ID (例: cn-hangzhou) に置き換えてください。
Spark SQL ジョブの送信
app_type を "SQL" に設定し、data パラメーターに SQL 文を渡します。
from alibabacloud_adb20211201.models import SubmitSparkAppRequest
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
-- 以下に SQL 文を追加します。
show databases;
"""
request = SubmitSparkAppRequest(
dbcluster_id="<cluster-id>",
resource_group_name="<resource-group-name>",
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark SQL job submitted. App ID:", app_id)Spark JAR ジョブの送信
app_type を "BATCH" に設定し、ジョブ構成を JSON 文字列として data パラメーターに渡します。
from alibabacloud_adb20211201.models import SubmitSparkAppRequest
import json
conf = {
"comments": [
"-- この例を変更して、独自の Spark プログラムを実行します。"
],
"args": ["1000"],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
request = SubmitSparkAppRequest(
dbcluster_id="<cluster-id>",
resource_group_name="<resource-group-name>",
data=json.dumps(conf),
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
response = client.submit_spark_app(request)
app_id = response.body.data.app_id
print("Spark JAR job submitted. App ID:", app_id)ジョブステータスのクエリ
ジョブの送信時に返された app_id を渡します。
from alibabacloud_adb20211201.models import GetSparkAppStateRequest
request = GetSparkAppStateRequest(app_id="<app-id>")
response = client.get_spark_app_state(request)
state = response.body.data.state
print("Job state:", state)ジョブのログのクエリ
from alibabacloud_adb20211201.models import GetSparkAppLogRequest
request = GetSparkAppLogRequest(app_id="<app-id>")
response = client.get_spark_app_log(request)
log_content = response.body.data.log_content
print("Job logs:", log_content)ジョブの終了
from alibabacloud_adb20211201.models import KillSparkAppRequest
request = KillSparkAppRequest(app_id="<app-id>")
response = client.kill_spark_app(request)
state = response.body.data.state
print("Job state after termination:", state)ジョブ履歴の一覧表示
from alibabacloud_adb20211201.models import ListSparkAppsRequest
request = ListSparkAppsRequest(
dbcluster_id="<cluster-id>",
page_number=1,
page_size=10
)
response = client.list_spark_apps(request)
print("Total jobs:", response.body.data.page_number)
for app_info in response.body.data.app_info_list:
print("App ID:", app_info.app_id)
print("State:", app_info.state)
print("Detail:", app_info.detail)完全な例
次のスクリプトは、すべての操作を単一の実行可能なプログラムにまとめたものです。
import os
import json
from alibabacloud_tea_openapi.models import Config
from alibabacloud_adb20211201.client import Client
from alibabacloud_adb20211201.models import (
SubmitSparkAppRequest,
GetSparkAppStateRequest,
GetSparkAppLogRequest,
KillSparkAppRequest,
ListSparkAppsRequest,
)
# クライアントを初期化します。
config = Config(
access_key_id=os.environ['ALIBABA_CLOUD_ACCESS_KEY_ID'],
access_key_secret=os.environ['ALIBABA_CLOUD_ACCESS_KEY_SECRET'],
endpoint="adb.<region-id>.aliyuncs.com"
)
client = Client(config)
cluster_id = "<cluster-id>"
rg_name = "<resource-group-name>"
# Spark SQL ジョブを送信します。
sql = """
set spark.driver.resourceSpec=medium;
set spark.executor.instances=2;
set spark.executor.resourceSpec=medium;
set spark.app.name=Spark SQL Test;
show databases;
"""
sql_request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=sql,
app_type="SQL",
agent_source="Python SDK",
agent_version="1.0.0"
)
sql_response = client.submit_spark_app(sql_request)
sql_app_id = sql_response.body.data.app_id
print("SQL job App ID:", sql_app_id)
# Spark JAR ジョブを送信します。
jar_conf = {
"comments": [
"-- この例を変更して、独自の Spark プログラムを実行します。"
],
"args": ["1000"],
"file": "local:///tmp/spark-examples.jar",
"name": "SparkPi",
"className": "org.apache.spark.examples.SparkPi",
"conf": {
"spark.driver.resourceSpec": "medium",
"spark.executor.instances": 2,
"spark.executor.resourceSpec": "medium"
}
}
jar_request = SubmitSparkAppRequest(
dbcluster_id=cluster_id,
resource_group_name=rg_name,
data=json.dumps(jar_conf),
app_type="BATCH",
agent_source="Python SDK",
agent_version="1.0.0"
)
jar_response = client.submit_spark_app(jar_request)
jar_app_id = jar_response.body.data.app_id
print("JAR job App ID:", jar_app_id)
# ジョブステータスをクエリします。
state_request = GetSparkAppStateRequest(app_id=sql_app_id)
state_response = client.get_spark_app_state(state_request)
print("SQL job state:", state_response.body.data.state)
# ジョブログをクエリします。
log_request = GetSparkAppLogRequest(app_id=sql_app_id)
log_response = client.get_spark_app_log(log_request)
print("SQL job logs:", log_response.body.data.log_content)
# ジョブ履歴を一覧表示します。
list_request = ListSparkAppsRequest(
dbcluster_id=cluster_id,
page_number=1,
page_size=10
)
list_response = client.list_spark_apps(list_request)
print("Total jobs:", list_response.body.data.page_number)
for app_info in list_response.body.data.app_info_list:
print(app_info.app_id, app_info.state, app_info.detail)
# JAR ジョブを終了します。
kill_request = KillSparkAppRequest(app_id=jar_app_id)
kill_response = client.kill_spark_app(kill_request)
print("JAR job state after termination:", kill_response.body.data.state)API リファレンス
SubmitSparkAppRequest のパラメーター
| パラメーター | 型 | 説明 |
|---|---|---|
dbcluster_id | String | AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターの ID。 |
resource_group_name | String | ジョブリソースグループの名前。 |
data | String | SQL 文 (SQL ジョブの場合) または JSON 構成文字列 (JAR ジョブの場合)。 |
app_type | String | ジョブタイプ。有効な値: "SQL" (Spark SQL ジョブ)、"BATCH" (Spark JAR ジョブ)。 |
agent_source | String | 送信元。 "Python SDK" に設定します。 |
agent_version | String | SDK のバージョン。 "1.0.0" に設定します。 |
ListSparkAppsRequest のパラメーター
| パラメーター | 型 | 説明 |
|---|---|---|
dbcluster_id | String | AnalyticDB for MySQL Data Lakehouse Edition (V3.0) クラスターの ID。 |
page_number | Integer | ページ番号。ページは 1 から始まります。デフォルト値: 1。 |
page_size | Integer | ページあたりのエントリ数。 |
SDK のクラスとメソッド
| 操作 | リクエストクラス | クライアントメソッド | 応答フィールド |
|---|---|---|---|
| ジョブの送信 | SubmitSparkAppRequest | client.submit_spark_app() | response.body.data.app_id |
| ジョブステータスのクエリ | GetSparkAppStateRequest | client.get_spark_app_state() | response.body.data.state |
| クエリ ジョブのログ | GetSparkAppLogRequest | client.get_spark_app_log() | response.body.data.log_content |
| ジョブの終了 | KillSparkAppRequest | client.kill_spark_app() | response.body.data.state |
| ジョブ履歴の一覧表示 | ListSparkAppsRequest | client.list_spark_apps() | response.body.data.app_info_list |