本文為您介紹DTSLakeInjectionOperator操作的配置資訊。
功能說明
藉助DTS的能力,將DMS管理的資料庫中的資料同步到Object Storage Service上。
參數說明
說明
參數bucket_name、db_list、reserve可以使用Jinja模板。
參數 | 類型 | 是否必填 | 說明 |
source_instance | string | 是 | 源DBLink的名稱。 |
source_database | string | 是 | 源庫名稱。 |
target_instance | string | 是 | 目標DBLink的名稱。 |
bucket_name | string | 是 | OSS的Bucket名稱。 |
db_list | dict | 是 | 需要同步的對象,詳情請參見同步對象說明。 |
reserve | dict | 否 說明 部分資料庫必填。 | 任務的預留參數,詳情請參見Reserve參數說明。 |
polling_interval | int | 否 | 重新整理執行結果的間隔時間。單位為秒,預設值為10。 |
樣本
說明
task_id和dag是Airflow的特定參數,詳情請參見Airflow官方文檔。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
with DAG(
"dms_dts_dblink",
params={
},
) as dag:
dts_operator = DTSLakeInjectionOperator(
task_id="dts_test_dblink",
source_instance='dblink_90',
source_database='student_db',
target_instance='dbl_oss_63',
bucket_name='hansheng-bj',
db_list=json.loads("""
{\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
"""),
reserve=json.loads("""
{\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
"""),
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
dts_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)