このトピックでは、DMSSqlOperator 操作の構成情報について説明します。
機能の説明
DMS で管理されているデータベースインスタンスに SQL 文を送信して実行し、結果を取得します。
パラメーター
パラメーター instance、database、および sql は Jinja テンプレートを使用できます。
パラメーター | タイプ | 必須 | 説明 |
instance | string | はい | DMS で管理されているデータベースインスタンスの接続 (DBLink) です。 |
database | string | はい | データベース名。 |
sql | string | はい | 実行する SQL 文。 説明 複数の SQL 文はセミコロン (;) で区切る必要があります。 |
csv_null_replace_str | string | いいえ |
|
callback | function | いいえ | SQL 操作の結果を処理するために使用されるコールバック関数。入力パラメーターは PollAsyncSQLExecuteResult です。 説明 このパラメーターは、 |
polling_interval | int | いいえ | 実行結果がリフレッシュされる間隔。単位は秒です。デフォルト値は 10 です。 |
PollAsyncSQLExecuteResult
パラメーター | タイプ | 説明 |
Status | string | SQL 操作の実行ステータス。有効な値:
|
SQLType | string | SQL 操作のタイプ。有効な値:
|
ResultType | string | SQL 操作の実行結果のタイプ。 説明 このパラメーターが空の場合、SQL 操作は完了していません。
|
ResultContent | JSON | SQL 操作の実行結果に関する特定の情報。 説明
|
例
task_id と dag は Airflow の特定のパラメーターです。詳細については、「Airflow 公式ドキュメント」をご参照ください。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
import json
import requests
def callback(result):
print(f"result: {result}")
if 'Data' in result and 'ResultContent' in result['Data']:
link = result['Data']['ResultContent']['ResultSetFileLink']
print(f"get link: {link}")
http_res = requests.get(link, headers={
"x-oss-range-behavior": "standard"
})
print(f"link res: {http_res.text}")
with DAG(
"dms_sql_dblink",
params={
},
) as dag:
sql_operator = DMSSqlOperator(
task_id="sql_test_dblink",
instance="dblink_90",
database="student_db",
sql="show databases;show tables;",
csv_null_replace_str="null",
callback=callback,
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
sql_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)