このトピックでは、DTSLakeInjectionOperator 操作の構成情報について説明します。
機能の説明
DTS 機能を利用して、DMS で管理されているデータベースから Object Storage Service (OSS) にデータを同期します。
パラメーター
説明
パラメーター bucket_name、db_list、および reserve は Jinja テンプレート を使用できます。
パラメーター | タイプ | 必須 | 説明 |
source_instance | 文字列 | はい | ソース DBLink の名前。。 |
source_database | 文字列 | はい | ソースデータベースの名前。 |
target_instance | 文字列 | はい | ターゲット DBLink の名前。 |
bucket_name | 文字列 | はい | OSS バケットの名前。 |
db_list | dict | はい | 同期するオブジェクト。詳細については、「同期オブジェクトの説明」をご参照ください。 |
reserve | dict | いいえ 説明 一部のデータベースでは必須です。 | タスクの予約パラメーター。詳細については、「予約パラメーターの説明」をご参照ください。 |
polling_interval | int | いいえ | 実行結果がリフレッシュされる間隔。単位は秒です。デフォルト値は 10 です。 |
例
説明
task_id と dag は Airflow の特定のパラメーターです。詳細については、「Airflow 公式ドキュメント」をご参照ください。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.providers.alibaba_dms.cloud.operators.dms_dts import DTSLakeInjectionOperator
with DAG(
"dms_dts_dblink",
params={
},
) as dag:
dts_operator = DTSLakeInjectionOperator(
task_id="dts_test_dblink",
source_instance='dblink_90',
source_database='student_db',
target_instance='dbl_oss_63',
bucket_name='hansheng-bj',
db_list=json.loads("""
{\"student_db\":{\"name\":\"hansheng_student_db\",\"all\":false,\"Table\":{\"student_info\":{\"name\":\"student_info\",\"all\":true}}}}
"""),
reserve=json.loads("""
{\"fusionOssFileFormat\":\"DELTA\",\"fusionOssFilePath\":\"/hansheng-bj/student_info.txt\",\"a2aFlag\":\"2.0\",\"autoStartModulesAfterConfig\":\"none\",\"fusionCreatetableAfterCompleted\": \"false\"}
"""),
polling_interval=5,
dag=dag
)
run_this_last = EmptyOperator(
task_id="run_this_last",
dag=dag,
)
dts_operator >> run_this_last
if __name__ == "__main__":
dag.test(
run_conf={}
)