すべてのプロダクト
Search
ドキュメントセンター

Data Management:DMSAnalyticDBSparkSqlOperator

最終更新日:Nov 09, 2025

この Topic では、DMSAnalyticDBSparkSqlOperator 操作の構成情報について説明します。

機能の説明

Data Lakehouse Edition AnalyticDB MySQL (タスクタイプ: Interactive、エンジン: Spark) によって管理される特定のリソースグループに Spark SQL を送信します。詳細については、「アプリケーションシナリオ」をご参照ください。

パラメーター

説明

パラメーター sqlconfJinja テンプレート を使用できます。

パラメーター

タイプ

必須

説明

cluster_id

string

はい

  • cluster_id: AnalyticDB MySQL クラスターの ID。

  • instance: DMS 。

説明

これらのパラメーターのいずれかを選択します。instance を使用することをお勧めします。

instance

string

resource_group

string

はい

AnalyticDB MySQL クラスター内のリソースグループの名前。

sql

string

はい

Spark の SQL 操作です。詳細については、「Spark ドキュメント」をご参照ください。

conf

dict

いいえ

Spark の特別な構成です。詳細については、「ExecuteSparkWarehouseBatchSQL」、「GetSparkWarehouseBatchSQL」、および「CancelSparkWarehouseBatchSQL」をご参照ください。

説明

conf パラメーターは、ExecuteSparkWarehouseBatchSQLRuntimeConfig です。

schema

string

いいえ

データベース。デフォルト値: default。

polling_interval

int

いいえ

実行結果がリフレッシュされる間隔。単位: 秒。デフォルト値: 10。

execute_time_limit_in_seconds

int

いいえ

タイムアウト期間。単位: 秒。デフォルト値: 36000 (10 時間)。

callback

function

いいえ

SQL 操作の結果を処理するために使用されるコールバック関数。入力パラメーターは SparkBatchSQL です。

説明

このパラメーターは、polling_interval の値が 0 より大きい場合にのみ有効になります。

説明

task_iddag は Airflow の特定のパラメーターです。詳細については、「Airflow 公式ドキュメント」をご参照ください。

from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from datetime import datetime

from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator

# alibabacloud_adb20211201.models.SparkBatchSQL
def print_result(result):
    print(f"{result}")

with DAG(
    "dms_adb_spark_dblink",
    params={
        "sql": "show databases;show databases;"
    }
) as dag:

    warehouse_operator: Any = DMSAnalyticDBSparkSqlOperator(
        task_id="warehouse_sql",
        instance="dbl_adbmysql_89",
        resource_group="hansheng_spark_test",
        sql="{{ params.sql }}",
        polling_interval=5,
        conf={
            'spark.adb.sqlOutputFormat':'CSV',
            'spark.adb.sqlOutputPartitions':1,
            'spark.adb.sqlOutputLocation':'oss://hansheng-bj/airflow/adb_spark/test',
            'sep':'|'
        },
        callback=print_result,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag
    )

    warehouse_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )