All Products
Search
Document Center

Data Management:DTSLakeInjectionOperator

Last Updated:Jul 23, 2025

This topic describes the configuration information of the DTSLakeInjectionOperator operation.

Feature description

Utilizes DTS capabilities to synchronize data from databases managed by DMS to Object Storage Service (OSS).

Parameters

Note

Parameters bucket_name, db_list, and reserve can use Jinja templates.

Parameter

Type

Required

Description

source_instance

string

Yes

The name of the source DBLink.

source_database

string

Yes

The name of the source database.

target_instance

string

Yes

The name of the target DBLink.

bucket_name

string

Yes

The name of the OSS bucket.

db_list

dict

Yes

The objects to be synchronized. For more information, see Synchronization object description.

reserve

dict

No

Note

Required for some databases.

The reserved parameters of the task. For more information, see Reserve parameter description.

polling_interval

int

No

The interval at which the execution results are refreshed. The unit is seconds. The default value is 10.

Example

Note

task_id and dag are specific parameters of Airflow. For more information, see Airflow official documentation.

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator

from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator

with DAG(
    "dms_dts_dblink",
    params={
    },
) as dag:

    dts_operator = DTSLakeInjectionOperator(
        task_id="dts_test_dblink",
        source_instance='dblink_90',
        source_database='student_db',
        target_instance='dbl_oss_63',
        bucket_name='hansheng-bj',
        db_list=json.loads("""
        {\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
        """),
        reserve=json.loads("""
        {\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
        """),
        polling_interval=5,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    dts_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )