全部產品
Search
文件中心

Data Transmission Service:從PolarDB MySQL版同步至Datahub

更新時間:Jul 06, 2024

阿里雲流式資料服務DataHub是流式資料(Streaming Data)的處理平台,提供對流式資料的發布、訂閱和分發功能,讓您可以輕鬆構建基於流式資料的分析和應用。通過Data Transmission Service (简称DTS),您可以將PolarDB MySQL版同步至DataHub,協助您快速實現使用Realtime Compute等巨量資料產品對資料即時分析。

前提條件

  • DataHub執行個體的地區為華東1、華東2、華北2或華南1。
  • DataHub執行個體中,已建立用作接收同步資料的專案(Project),詳情請參見建立專案
  • PolarDB MySQL版叢集已開啟Binlog,詳情請參見如何開啟Binlog
  • PolarDB MySQL版中待同步的表需具備主鍵或唯一約束。

費用說明

同步類型鏈路配置費用
庫表結構同步和全量資料同步不收費。
增量資料同步收費,詳情請參見計費概述

功能限制

  • 不支援全量資料初始化,即DTS不會將源PolarDB MySQL版叢集中同步對象的存量資料同步至目標DataHub執行個體。
  • 僅支援表層級的資料同步。
  • 不支援新增列的資料同步,即來源資料表新增了某個列,該列的資料不會同步至目標DataHub執行個體。
  • 資料同步的過程中,請勿對源庫中待同步的表執行DDL變更,否則會導致同步失敗。

支援同步的SQL操作

操作類型

SQL動作陳述式

DML

INSERT、UPDATE、DELETE

DDL

ADD COLUMN

操作步驟

  1. 購買資料同步作業,詳情請參見購買流程
    說明 購買時,選擇源執行個體為PolarDB、目標執行個體為DataHub,並選擇同步拓撲為單向同步
  2. 登入資料轉送控制台

    說明

    若資料轉送控制台自動跳轉至Data Management控制台,您可以在右下角的jiqiren中單擊返回舊版,返回至舊版資料轉送控制台。

  3. 在左側導覽列,單擊資料同步

  4. 同步作業列表頁面頂部,選擇同步的目標執行個體所屬地區。

  5. 定位至已購買的資料同步執行個體,單擊配置同步鏈路

  6. 配置同步作業的源執行個體及目標執行個體資訊。
    配置源和目標執行個體資訊
    類別配置說明
    同步作業名稱DTS會自動產生一個同步作業名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。
    源執行個體資訊執行個體類型固定為PolarDB執行個體,不可變更。
    執行個體地區購買資料同步執行個體時選擇的源執行個體地區資訊,不可變更。
    PolarDB執行個體ID選擇源PolarDB MySQL版叢集ID。
    資料庫帳號填入PolarDB MySQL版叢集的資料庫帳號。
    資料庫密碼填入資料庫帳號對應的密碼。
    目標執行個體資訊執行個體類型固定為DataHub,不可變更。
    執行個體地區購買資料同步執行個體時選擇的目標執行個體地區資訊,不可變更。
    Project選擇DataHub執行個體的Project
  7. 單擊頁面右下角的授權白名單並進入下一步

    如果源或目標資料庫是阿里雲資料庫執行個體(例如RDS MySQLApsaraDB for MongoDB等),DTS會自動將對應地區DTS服務的IP地址添加到阿里雲資料庫執行個體的白名單中;如果源或目標資料庫是ECS上的自建資料庫,DTS會自動將對應地區DTS服務的IP地址添到ECS的安全規則中,您還需確保自建資料庫沒有限制ECS的訪問(若資料庫是叢集部署在多個ECS執行個體,您需要手動將DTS服務對應地區的IP地址添到其餘每個ECS的安全規則中);如果源或目標資料庫是IDC自建資料庫或其他雲資料庫,則需要您手動添加對應地區DTS服務的IP地址,以允許來自DTS伺服器的訪問。DTS服務的IP地址,請參見DTS伺服器的IP位址區段

    警告

    DTS自動添加或您手動添加DTS服務的公網IP位址區段可能會存在安全風險,一旦使用本產品代表您已理解和確認其中可能存在的安全風險,並且需要您做好基本的安全防護,包括但不限於加強帳號密碼強度防範、限制各網段開放的連接埠號碼、內部各API使用鑒權方式通訊、定期檢查並限制不需要的網段,或者使用通過內網(專線/VPN網關/智能網關)的方式接入。

  8. 配置同步策略和同步對象。
    配置同步對象

    配置

    說明

    同步初始化

    勾選結構初始化

    說明

    勾選結構初始化後,在資料同步作業的初始化階段,DTS會將同步對象的結構資訊(例如表結構)同步至目標DataHub執行個體。

    選擇同步對象

    源庫對象框中單擊待遷移的對象,然後單擊向右小箭頭將其移動至已選擇對象框。

    說明
    • 同步對象的選擇粒度為表。

    • 預設情況下,同步對象的名稱保持不變。如果您需要改變同步對象在目標執行個體中的名稱,需要使用對象名映射功能,詳情請參見設定同步對象在目標執行個體中的名稱

    選擇附加列規則

    DTS在將資料同步到DataHub時,會在同步的目標Topic中添加一些附加列。如果附加列和目標Topic中已有的列出現名稱衝突將會導致資料同步失敗。您需要根據業務需求選擇是否啟用新的附加列規則

    警告

    在選擇附加列規則前,您需要評估附加列和目標Topic中已有的列是否會出現名稱衝突,否則可能會導致任務失敗或資料丟失。關於附加列的規則和定義說明,請參見附加列名稱和定義說明

    映射名稱更改

    如需更改同步對象在目標執行個體中的名稱,請使用對象名映射功能,詳情請參見庫表列映射

    源表DMS_ONLINE_DDL過程中是否複製暫存資料表到目標庫

    如源庫使用Data Management(Data Management)執行Online DDL變更,您可以選擇是否同步Online DDL變更產生的暫存資料表資料。

    • :同步Online DDL變更產生的暫存資料表資料。

      說明

      Online DDL變更產生的暫存資料表資料過大,可能會導致同步任務延遲。

    • :不同步Online DDL變更產生的暫存資料表資料,只同步源庫的原始DDL資料。

      說明

      該方案會導致目標庫鎖表。

    源、目標庫無法串連重試時間

    當源、目標庫無法串連時,DTS預設重試720分鐘(即12小時),您也可以自訂重試時間。如果DTS在設定的時間內重新串連上源、目標庫,同步任務將自動回復。否則,同步任務將失敗。

    說明

    由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。

  9. 可選:將滑鼠指標放置在已選擇對象框中待同步的Topic名上,單擊對象後出現的編輯,然後在彈出的對話方塊中設定Shardkey(即用於分區的key)。

  10. 上述配置完成後,單擊頁面右下角的預檢查並啟動

    說明
    • 在同步作業正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動同步作業。

    • 如果預檢查失敗,單擊具體檢查項後的提示,查看失敗詳情。

      • 您可以根據提示修複後重新進行預檢查。

      • 如無需修複警示檢測項,您也可以選擇確認屏蔽忽略警示項並重新進行預檢查,跳過警示檢測項重新進行預檢查。

  11. 預檢查對話方塊中顯示預檢查通過後,關閉預檢查對話方塊,同步作業將正式開始。

  12. 等待同步作業的鏈路初始化完成,直至處於同步中狀態。

    您可以在資料同步頁面,查看資料同步作業的狀態。查看同步作業狀態

Topic結構定義說明

DTS在將資料變更同步至DataHub執行個體的Topic時,目標Topic中除了儲存變更資料外,還會新增一些附加列用於儲存元資訊,樣本如下。

說明

本案例中的業務欄位為idnameaddress,由於在配置資料同步時選用的是舊版附加列規則,DTS會為業務欄位(包含目標庫中的源庫原有的業欄位)添加dts_的首碼。若您使用新版附加列規則,DTS不會給目標庫中的源庫原有業欄位加首碼。

Topic定義

結構定義說明:

舊版附加列名稱

新版附加列名稱

資料類型

說明

dts_record_id

new_dts_sync_dts_record_id

String

增量日誌的記錄ID,為該日誌唯一標識。

說明
  • 正常情況下是全域唯一自增的,在容災的情況下會有回退且無法保證自增和唯一。

  • 如果增量日誌的操作類型為UPDATE,那麼累加式更新會被拆分成兩條記錄(分別記錄更新前和更新後的值),且dts_record_id的值相同。

dts_operation_flag

new_dts_sync_dts_operation_flag

String

操作類型,取值:

  • I:INSERT操作。

  • D:DELETE操作。

  • U:UPDATE操作。

dts_instance_id

new_dts_sync_dts_instance_id

String

資料庫的server ID。

dts_db_name

new_dts_sync_dts_db_name

String

資料庫名稱。

dts_table_name

new_dts_sync_dts_table_name

String

表名。

dts_utc_timestamp

new_dts_sync_dts_utc_timestamp

String

操作時間戳記,即日誌的時間戳記(UTC 時間)。

dts_before_flag

new_dts_sync_dts_before_flag

String

所有列的值是否更新前的值,取值:Y或N。

dts_after_flag

new_dts_sync_dts_after_flag

String

所有列的值是否更新後的值,取值:Y或N。

關於dts_before_flag和dts_after_flag的補充說明

對於不同的操作類型,增量日誌中的dts_before_flagdts_after_flag定義如下:

  • INSERT

    當操作類型為INSERT時,所有列的值為新插入的值,即為更新後的值,所以dts_before_flag取值為N,dts_after_flag取值為Y,樣本如下。

    INSERT操作

  • UPDATE

    當操作類型為UPDATE時,DTS會將UPDATE操作拆為兩條增量日誌。這兩條增量日誌的dts_record_iddts_operation_flagdts_utc_timestamp對應的值相同。

    第一條增量日誌記錄了更新前的值,所以dts_before_flag取值為Y,dts_after_flag取值為N。第二條增量日誌記錄了更新後的值,所以 dts_before_flag取值為N,dts_after_flag取值為Y,樣本如下。

    UPDATE操作

  • DELETE

    當操作類型為DELETE時,增量日誌中所有的列值為被刪除的值,即列值不變,所以dts_before_flag取值為Y, dts_after_flag取值為N,樣本如下。

    DELETE操作

後續操作

配置完資料同步作業後,您可以對同步到DataHub執行個體中的資料執行計算分析。更多詳情,請參見阿里雲Realtime Compute