すべてのプロダクト
Search
ドキュメントセンター

Data Management:DMSSqlOperator

最終更新日:Nov 09, 2025

このトピックでは、DMSSqlOperator 操作の構成情報について説明します。

機能の説明

DMS で管理されているデータベースインスタンスに SQL 文を送信して実行し、結果を取得します。

パラメーター

説明

パラメーター instancedatabase、および sqlJinja テンプレートを使用できます。

パラメーター

タイプ

必須

説明

instance

string

はい

DMS で管理されているデータベースインスタンスの接続 (DBLink) です。

database

string

はい

データベース名。

sql

string

はい

実行する SQL 文。

説明

複数の SQL 文はセミコロン (;) で区切る必要があります。

csv_null_replace_str

string

いいえ

resultset 内の null 値を置き換えるために使用される文字列。デフォルト値は文字列 "null" です。

callback

function

いいえ

SQL 操作の結果を処理するために使用されるコールバック関数。入力パラメーターは PollAsyncSQLExecuteResult です。

説明

このパラメーターは、polling_interval の値が 0 より大きい場合にのみ有効になります。

polling_interval

int

いいえ

実行結果がリフレッシュされる間隔。単位は秒です。デフォルト値は 10 です。

PollAsyncSQLExecuteResult

パラメーター

タイプ

説明

Status

string

SQL 操作の実行ステータス。有効な値:

  • WAITING

  • RUNNING

  • SUCCESS

  • FAILURE

SQLType

string

SQL 操作のタイプ。有効な値:

  • DDL

  • DML

  • DQL

  • UNKNOWN

    説明

    解析できません。

ResultType

string

SQL 操作の実行結果のタイプ。

説明

このパラメーターが空の場合、SQL 操作は完了していません。

  • PLAINTEXT: ResultContent の内容を使用できます。このタイプは通常、DDL および DML 情報に使用されます。

  • FILE: OSS からダウンロードする必要があります。デフォルトのフォーマットは CSV です。このタイプは通常、DQL 情報に使用されます。

ResultContent

JSON

SQL 操作の実行結果に関する特定の情報。

説明

resultSetFileLink リンクを使用してファイルをダウンロードする場合、リクエストヘッダー "x-oss-range-behavior:standard" を指定する必要があります。そうしないと、署名検証が失敗します。

{
  "resultSetFileLink" : "https://xxxx", // SQLType が DQL の場合は空ではありません
  "resultSetFileType" : "CSV", // SQLType が DQL の場合は CSV、他のタイプの場合は空
  "resultSetOption" : {
    ""
  }, 
  "columnMetas": [
    {
      
    }
      
  ],
  "count" : 1,  // DQL の場合は空ではなく、結果セットのレコード数を示します
  "affectRows" : 1, // DML、DQL などの場合は空ではなく、SQL の影響を受けた行数 (更新された行数など) を示します
  "errorMessage" : "xxxx" // Status が FAIL の場合は空ではありません
}

説明

task_iddag は Airflow の特定のパラメーターです。詳細については、「Airflow 公式ドキュメント」をご参照ください。

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator

from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator

import json
import requests

def callback(result):
    print(f"result: {result}")
    if 'Data' in result and 'ResultContent' in result['Data']:
        link = result['Data']['ResultContent']['ResultSetFileLink']
        print(f"get link: {link}")
        http_res = requests.get(link, headers={
            "x-oss-range-behavior": "standard"
        })
        print(f"link res: {http_res.text}")


with DAG(
    "dms_sql_dblink",
    params={
    },
) as dag:

    sql_operator = DMSSqlOperator(
        task_id="sql_test_dblink",
        instance="dblink_90",
        database="student_db",
        sql="show databases;show tables;",
        csv_null_replace_str="null",
        callback=callback,
        polling_interval=5,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    sql_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )