全部產品
Search
文件中心

Data Management:DTSLakeInjectionOperator

更新時間:Jul 24, 2025

本文為您介紹DTSLakeInjectionOperator操作的配置資訊。

功能說明

藉助DTS的能力,將DMS管理的資料庫中的資料同步到Object Storage Service上。

參數說明

說明

參數bucket_namedb_listreserve可以使用Jinja模板

參數

類型

是否必填

說明

source_instance

string

源DBLink的名稱。

source_database

string

源庫名稱。

target_instance

string

目標DBLink的名稱。

bucket_name

string

OSS的Bucket名稱。

db_list

dict

需要同步的對象,詳情請參見同步對象說明

reserve

dict

說明

部分資料庫必填。

任務的預留參數,詳情請參見Reserve參數說明

polling_interval

int

重新整理執行結果的間隔時間。單位為秒,預設值為10。

樣本

說明

task_iddag是Airflow的特定參數,詳情請參見Airflow官方文檔

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator

from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator

with DAG(
    "dms_dts_dblink",
    params={
    },
) as dag:

    dts_operator = DTSLakeInjectionOperator(
        task_id="dts_test_dblink",
        source_instance='dblink_90',
        source_database='student_db',
        target_instance='dbl_oss_63',
        bucket_name='hansheng-bj',
        db_list=json.loads("""
        {\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
        """),
        reserve=json.loads("""
        {\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
        """),
        polling_interval=5,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    dts_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )