すべてのプロダクト
Search
ドキュメントセンター

Data Management:DTSLakeInjectionOperator

最終更新日:Nov 09, 2025

このトピックでは、DTSLakeInjectionOperator 操作の構成情報について説明します。

機能の説明

DTS 機能を利用して、DMS で管理されているデータベースから Object Storage Service (OSS) にデータを同期します。

パラメーター

説明

パラメーター bucket_namedb_list、および reserveJinja テンプレート を使用できます。

パラメーター

タイプ

必須

説明

source_instance

文字列

はい

ソース DBLink の名前。。

source_database

文字列

はい

ソースデータベースの名前。

target_instance

文字列

はい

ターゲット DBLink の名前。

bucket_name

文字列

はい

OSS バケットの名前。

db_list

dict

はい

同期するオブジェクト。詳細については、「同期オブジェクトの説明」をご参照ください。

reserve

dict

いいえ

説明

一部のデータベースでは必須です。

タスクの予約パラメーター。詳細については、「予約パラメーターの説明」をご参照ください。

polling_interval

int

いいえ

実行結果がリフレッシュされる間隔。単位は秒です。デフォルト値は 10 です。

説明

task_iddag は Airflow の特定のパラメーターです。詳細については、「Airflow 公式ドキュメント」をご参照ください。

from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator

from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator

with DAG(
    "dms_dts_dblink",
    params={
    },
) as dag:

    dts_operator = DTSLakeInjectionOperator(
        task_id="dts_test_dblink",
        source_instance='dblink_90',
        source_database='student_db',
        target_instance='dbl_oss_63',
        bucket_name='hansheng-bj',
        db_list=json.loads("""
        {\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
        """),
        reserve=json.loads("""
        {\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
        """),
        polling_interval=5,
        dag=dag
    )

    run_this_last = EmptyOperator(
        task_id="run_this_last",
        dag=dag,
    )

    dts_operator >> run_this_last

if __name__ == "__main__":
    dag.test(
        run_conf={}
    )