この Topic では、DMSAnalyticDBSparkSqlOperator 操作の構成情報について説明します。
機能の説明
Data Lakehouse Edition AnalyticDB MySQL (タスクタイプ: Interactive、エンジン: Spark) によって管理される特定のリソースグループに Spark SQL を送信します。詳細については、「アプリケーションシナリオ」をご参照ください。
パラメーター
パラメーター sql と conf は Jinja テンプレート を使用できます。
パラメーター | タイプ | 必須 | 説明 |
cluster_id | string | はい |
説明 これらのパラメーターのいずれかを選択します。 |
instance | string | ||
resource_group | string | はい | AnalyticDB MySQL クラスター内のリソースグループの名前。 |
sql | string | はい | Spark の SQL 操作です。詳細については、「Spark ドキュメント」をご参照ください。 |
conf | dict | いいえ | Spark の特別な構成です。詳細については、「ExecuteSparkWarehouseBatchSQL」、「GetSparkWarehouseBatchSQL」、および「CancelSparkWarehouseBatchSQL」をご参照ください。 説明
|
schema | string | いいえ | データベース。デフォルト値: default。 |
polling_interval | int | いいえ | 実行結果がリフレッシュされる間隔。単位: 秒。デフォルト値: 10。 |
execute_time_limit_in_seconds | int | いいえ | タイムアウト期間。単位: 秒。デフォルト値: 36000 (10 時間)。 |
callback | function | いいえ | SQL 操作の結果を処理するために使用されるコールバック関数。入力パラメーターは SparkBatchSQL です。 説明 このパラメーターは、 |
例
task_id と dag は Airflow の特定のパラメーターです。詳細については、「Airflow 公式ドキュメント」をご参照ください。
from typing import Any
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from datetime import datetime
from airflow.providers.alibaba_dms.cloud.operators.dms_analyticdb_spark import DMSAnalyticDBSparkSqlOperator
# alibabacloud_adb20211201.models.SparkBatchSQL
def print_result(result):
print(f"{result}")
with DAG(
"dms_adb_spark_dblink",
params={
"sql": "show databases;show databases;"
}
) as dag:
warehouse_operator: Any = DMSAnalyticDBSparkSqlOperator(
task_id="warehouse_sql",
instance="dbl_adbmysql_89",
resource_group="hansheng_spark_test",
sql="{{ params.sql }}",
polling_interval=5,
conf={
'spark.adb.sqlOutputFormat':'CSV',
'spark.adb.sqlOutputPartitions':1,
'spark.adb.sqlOutputLocation':'oss://hansheng-bj/airflow/adb_spark/test',
'sep':'|'
},
callback=print_result,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag
)
warehouse_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)