Kafka是應用較為廣泛的分布式、高輸送量、高可擴充性訊息佇列服務,普遍用於日誌收集、監控資料彙總、流式資料處理、線上和離線分析等巨量資料領域,是巨量資料生態中不可或缺的產品之一。通過Data Transmission Service,您可以將RDS MySQL遷移至有公網IP的自建Kafka叢集,擴充訊息處理能力。
前提條件
已完成Kafka叢集的搭建,且Kafka的版本為0.10.1.0-2.7.0版本。
Kafka叢集的服務連接埠已開放至公網。
背景資訊
由於資料同步功能對自建Kafka的部署位置要求如下:
ECS上的自建資料庫
通過專線/VPN網關/Smart Access Gateway接入的自建資料庫
無公網IP:Port的資料庫(通過資料庫網關DG接入)
通過雲企業網CEN接入的自建資料庫
如果Kafka叢集的部署位置為本地,且不符合上述情境,您可以將自建Kafka的服務連接埠開放至公網,然後通過本文介紹的方法來實現資料同步需求。
注意事項
DTS在執行全量資料移轉時將佔用源庫和目標庫一定的讀寫資源,可能會導致資料庫的負載上升,在資料庫效能較差、規格較低或業務量較大的情況下(例如源庫有大量慢SQL、存在無主鍵表或目標庫存在死結等),可能會加重資料庫壓力,甚至導致資料庫服務不可用。因此您需要在執行資料移轉前評估源庫和目標庫的效能,同時建議您在業務低峰期執行資料移轉(例如源庫和目標庫的CPU負載在30%以下)。
如果來源資料庫沒有主鍵或唯一約束,且所有欄位沒有唯一性,可能會導致目標資料庫中出現重複資料。
遷移對象僅支援資料表。
費用說明
遷移類型 | 鏈路配置費用 | 公網流量費用 |
結構遷移和全量資料移轉 | 不收費。 | 通過公網將資料移轉出阿里雲時將收費,詳情請參見計費概述。 |
增量資料移轉 | 收費,詳情請參見計費概述。 |
操作步驟
登入資料轉送控制台。
說明若資料轉送控制台自動跳轉至Data Management控制台,您可以在右下角的
中單擊
,返回至舊版資料轉送控制台。在左側導覽列,單擊資料移轉。
在遷移工作清單頁面頂部,選擇遷移的目的地組群所屬地區。
單擊頁面右上方的建立遷移任務。
配置遷移任務的源庫及目標庫資訊。

類別
配置
說明
無
任務名稱
DTS會自動產生一個任務名稱,建議配置具有業務意義的名稱(無唯一性要求),便於後續識別。
源庫資訊
執行個體類型
選擇RDS。
執行個體地區
選擇源RDS執行個體所屬的地區。
執行個體ID
選擇源RDS執行個體ID。
資料庫帳號
填入源RDS執行個體的資料庫帳號,需具備REPLICATION CLIENT、REPLICATION SLAVE、SHOW VIEW和所有遷移對象的SELECT許可權。
資料庫密碼
填入該資料庫帳號的密碼。
串連方式
根據需求選擇非加密串連或SSL安全連線。如果設定為SSL安全連線,您需要提前開啟RDS執行個體的SSL加密功能,詳情請參見設定SSL加密。
重要目前僅中國內地及中國香港地區支援設定串連方式。
目標庫資訊
執行個體類型
選擇有公網IP的自建資料庫。
執行個體地區
無需設定。
資料庫類型
選擇Kafka。
主機名稱或IP地址
填入自建Kafka叢集的訪問地址,本案例中填入公網地址。
連接埠
填入Kafka叢集提供服務的連接埠,預設為9092。
資料庫帳號
填入Kafka叢集的使用者名稱,如Kafka叢集未開啟驗證可不填寫。
資料庫密碼
填入Kafka叢集使用者名稱的密碼,如Kafka叢集未開啟驗證可不填寫。
Topic
單擊右側的擷取Topic列表,然後在下拉框中選擇目標Topic。
Kafka版本
選擇目標Kafka叢集的版本。
串連方式
根據業務及安全需求,選擇非加密串連或SCRAM-SHA-256。
配置完成後,單擊頁面右下角的授權白名單並進入下一步。
如果源或目標資料庫是阿里雲資料庫執行個體(例如RDS MySQL、ApsaraDB for MongoDB等),DTS會自動將對應地區DTS服務的IP地址添加到阿里雲資料庫執行個體的白名單;如果源或目標資料庫是ECS上的自建資料庫,DTS會自動將對應地區DTS服務的IP地址添到ECS的安全規則中,您還需確保自建資料庫沒有限制ECS的訪問(若資料庫是叢集部署在多個ECS執行個體,您需要手動將DTS服務對應地區的IP地址添到其餘每個ECS的安全規則中);如果源或目標資料庫是IDC自建資料庫或其他雲資料庫,則需要您手動添加對應地區DTS服務的IP地址,以允許來自DTS伺服器的訪問。DTS服務的IP地址,請參見DTS伺服器的IP位址區段。
警告DTS自動添加或您手動添加DTS服務的公網IP位址區段可能會存在安全風險,一旦使用本產品代表您已理解和確認其中可能存在的安全風險,並且需要您做好基本的安全防護,包括但不限於加強帳號密碼強度防範、限制各網段開放的連接埠號碼、內部各API使用鑒權方式通訊、定期檢查並限制不需要的網段,或者使用通過內網(專線/VPN網關/智能網關)的方式接入。
配置遷移類型、策略和對象資訊。

配置
說明
遷移類型
同時選中結構遷移、全量資料移轉和增量資料移轉。
重要如果未選中增量資料移轉,為保障資料一致性,全量資料移轉期間請勿在源庫中寫入新的資料。
投遞到kafka的資料格式
遷移到Kafka叢集中的資料以avro格式或者Canal Json格式儲存,定義詳情請參見Kafka叢集的資料存放區格式。
遷移到Kafka Partition策略
根據業務需求選擇遷移的策略,詳細介紹請參見Kafka Partition同步策略說明。
遷移對象
在遷移對象框中單擊待遷移的表,然後單擊
表徵圖將其移動至已選擇對象框。映射名稱更改
如需更改遷移對象在目標執行個體中的名稱,請使用對象名映射功能,詳情請參見庫表列映射。
源、目標庫無法串連重試時間
預設重試12小時,您也可以自訂重試時間。如果DTS在設定的時間內重新串連上源、目標庫,遷移任務將自動回復。否則,遷移任務將失敗。
說明由於串連重試期間,DTS將收取任務運行費用,建議您根據業務需要自訂重試時間,或者在源和目標庫執行個體釋放後儘快釋放DTS執行個體。
源表DMS_ONLINE_DDL過程中是否複製暫存資料表到目標庫
如源庫使用Data Management(Data Management)執行Online DDL變更,您可以選擇是否遷移Online DDL變更產生的暫存資料表資料。
是:遷移Online DDL變更產生的暫存資料表資料。
說明Online DDL變更產生的暫存資料表資料過大,可能會導致遷移任務延遲。
否:不遷移Online DDL變更產生的暫存資料表資料,只遷移源庫的原始DDL資料。
說明該方案會導致目標庫鎖表。
上述配置完成後,單擊頁面右下角的預檢查並啟動。
說明在遷移任務正式啟動之前,會先進行預檢查。只有預檢查通過後,才能成功啟動遷移任務。
如果預檢查失敗,單擊具體檢查項後的
,查看失敗詳情。您可以根據提示修複後重新進行預檢查。
如無需修複警示檢測項,您也可以選擇確認屏蔽、忽略警示項並重新進行預檢查,跳過警示檢測項重新進行預檢查。
預檢查通過後,單擊下一步。
在彈出的購買配置確認對話方塊,選擇鏈路規格並選中資料轉送(隨用隨付)服務條款。
單擊購買並啟動,遷移任務正式開始。
結構遷移+全量資料移轉
請勿手動結束遷移任務,否則可能會導致資料不完整。您只需等待遷移任務完成即可,遷移任務會自動結束。
結構遷移+全量資料移轉+增量資料移轉
遷移任務不會自動結束,您需要手動結束遷移任務。
重要請選擇合適的時間手動結束遷移任務,例如業務低峰期或準備將業務切換至目的地組群時。
觀察遷移任務的進度變更為增量遷移,並顯示為無延遲狀態時,將源庫停寫幾分鐘,此時增量遷移的狀態可能會顯示延遲的時間。
等待遷移任務的增量遷移再次進入無延遲狀態後,手動結束遷移任務。
