This topic describes the configuration information of the DTSLakeInjectionOperator operation.
Feature description
Utilizes DTS capabilities to synchronize data from databases managed by DMS to Object Storage Service (OSS).
Parameters
Parameters bucket_name, db_list, and reserve can use Jinja templates.
Parameter | Type | Required | Description |
source_instance | string | Yes | The name of the source DBLink. |
source_database | string | Yes | The name of the source database. |
target_instance | string | Yes | The name of the target DBLink. |
bucket_name | string | Yes | The name of the OSS bucket. |
db_list | dict | Yes | The objects to be synchronized. For more information, see Synchronization object description. |
reserve | dict | No Note Required for some databases. | The reserved parameters of the task. For more information, see Reserve parameter description. |
polling_interval | int | No | The interval at which the execution results are refreshed. The unit is seconds. The default value is 10. |
Example
task_id and dag are specific parameters of Airflow. For more information, see Airflow official documentation.
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
with DAG(
"dms_dts_dblink",
params={
},
) as dag:
dts_operator = DTSLakeInjectionOperator(
task_id="dts_test_dblink",
source_instance='dblink_90',
source_database='student_db',
target_instance='dbl_oss_63',
bucket_name='hansheng-bj',
db_list=json.loads("""
{\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
"""),
reserve=json.loads("""
{\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
"""),
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
dts_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)