All Products
Search
Document Center

Data Management:DMSSqlOperator

Last Updated:Mar 28, 2026

DMSSqlOperator submits SQL statements to a database instance managed by Data Management Service (DMS) for execution and retrieves the results.

Parameters

The instance, database, and sql parameters support Jinja templates.

ParameterTypeRequiredDescription
instancestringYesThe DBLink name of the database instance managed by DMS.
databasestringYesThe database name.
sqlstringYesThe SQL statement to execute. Separate multiple statements with semicolons (;).
csv_null_replace_strstringNoThe string used to replace null values in the result set. Default: "null".
callbackfunctionNoA callback function to process the SQL execution results. The input is a PollAsyncSQLExecuteResult object. Takes effect only when polling_interval is greater than 0. Default: none.
polling_intervalintNoHow often DMS polls for execution results, in seconds. Default: 10.
Note

task_id and dag are standard Apache Airflow parameters. For details, see the Airflow documentation.

PollAsyncSQLExecuteResult

When polling_interval is greater than 0, DMS polls for results and passes a PollAsyncSQLExecuteResult object to the callback function on each poll cycle.

FieldTypeDescription
StatusstringThe execution status. Valid values: WAITING, RUNNING, SUCCESS, FAILURE.
SQLTypestringThe SQL statement type. Valid values: DDL (Data Definition Language), DML (Data Manipulation Language), DQL (Data Query Language), UNKNOWN (cannot be parsed).
ResultTypestringThe result format. If empty, the SQL operation is not yet complete. Valid values: PLAINTEXT (use ResultContent directly; typical for DDL and DML), FILE (download from OSS in CSV format; typical for DQL).
ResultContentJSONThe execution result details. See the ResultContent fields below.

ResultContent fields

{
  "resultSetFileLink": "https://...",   // DQL only: download URL for the result set file
  "resultSetFileType": "CSV",           // DQL only: file format (always CSV)
  "resultSetOption": {},
  "columnMetas": [],                    // Column metadata
  "count": 1,                          // DQL only: number of records in the result set
  "affectRows": 1,                     // DML/DQL: number of rows affected
  "errorMessage": "..."                // Non-empty when Status is FAIL
}
Important

When downloading a result set file using resultSetFileLink, include the request header x-oss-range-behavior: standard. Without this header, OSS signature verification fails.

Example

The following DAG runs two SQL statements against a DMS-managed database. After each polling cycle, the callback function retrieves the result set file from Object Storage Service (OSS).

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator

from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator

import json
import requests

def callback(result):
    print(f"result: {result}")
    if 'Data' in result and 'ResultContent' in result['Data']:
        link = result['Data']['ResultContent']['ResultSetFileLink']
        print(f"get link: {link}")
        http_res = requests.get(link, headers={
            "x-oss-range-behavior": "standard"
        })
        print(f"link res: {http_res.text}")


with DAG(
    "dms_sql_dblink",
    params={
    },
) as dag:

    sql_operator = DMSSqlOperator(
        task_id="sql_test_dblink",
        instance="dblink_90",        # DBLink name in DMS
        database="student_db",
        sql="show databases;show tables;",
        csv_null_replace_str="null",
        callback=callback,
        polling_interval=5,          # Poll every 5 seconds
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    sql_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )