DMSSqlOperator submits SQL statements to a database instance managed by Data Management Service (DMS) for execution and retrieves the results.
Parameters
The instance, database, and sql parameters support Jinja templates.
| Parameter | Type | Required | Description |
|---|---|---|---|
instance | string | Yes | The DBLink name of the database instance managed by DMS. |
database | string | Yes | The database name. |
sql | string | Yes | The SQL statement to execute. Separate multiple statements with semicolons (;). |
csv_null_replace_str | string | No | The string used to replace null values in the result set. Default: "null". |
callback | function | No | A callback function to process the SQL execution results. The input is a PollAsyncSQLExecuteResult object. Takes effect only when polling_interval is greater than 0. Default: none. |
polling_interval | int | No | How often DMS polls for execution results, in seconds. Default: 10. |
task_id and dag are standard Apache Airflow parameters. For details, see the Airflow documentation.
PollAsyncSQLExecuteResult
When polling_interval is greater than 0, DMS polls for results and passes a PollAsyncSQLExecuteResult object to the callback function on each poll cycle.
| Field | Type | Description |
|---|---|---|
Status | string | The execution status. Valid values: WAITING, RUNNING, SUCCESS, FAILURE. |
SQLType | string | The SQL statement type. Valid values: DDL (Data Definition Language), DML (Data Manipulation Language), DQL (Data Query Language), UNKNOWN (cannot be parsed). |
ResultType | string | The result format. If empty, the SQL operation is not yet complete. Valid values: PLAINTEXT (use ResultContent directly; typical for DDL and DML), FILE (download from OSS in CSV format; typical for DQL). |
ResultContent | JSON | The execution result details. See the ResultContent fields below. |
ResultContent fields
{
"resultSetFileLink": "https://...", // DQL only: download URL for the result set file
"resultSetFileType": "CSV", // DQL only: file format (always CSV)
"resultSetOption": {},
"columnMetas": [], // Column metadata
"count": 1, // DQL only: number of records in the result set
"affectRows": 1, // DML/DQL: number of rows affected
"errorMessage": "..." // Non-empty when Status is FAIL
}When downloading a result set file using resultSetFileLink, include the request header x-oss-range-behavior: standard. Without this header, OSS signature verification fails.
Example
The following DAG runs two SQL statements against a DMS-managed database. After each polling cycle, the callback function retrieves the result set file from Object Storage Service (OSS).
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_sql import DMSSqlOperator
import json
import requests
def callback(result):
print(f"result: {result}")
if 'Data' in result and 'ResultContent' in result['Data']:
link = result['Data']['ResultContent']['ResultSetFileLink']
print(f"get link: {link}")
http_res = requests.get(link, headers={
"x-oss-range-behavior": "standard"
})
print(f"link res: {http_res.text}")
with DAG(
"dms_sql_dblink",
params={
},
) as dag:
sql_operator = DMSSqlOperator(
task_id="sql_test_dblink",
instance="dblink_90", # DBLink name in DMS
database="student_db",
sql="show databases;show tables;",
csv_null_replace_str="null",
callback=callback,
polling_interval=5, # Poll every 5 seconds
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
sql_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)