CDAS支援整庫層級的表結構和資料的即時同步,還支援表結構變更的同步。本文為您介紹CDAS的用法及實踐情境。
建議使用資料攝入YAML作業完成資料攝入作業邏輯開發,已有的CTAS/CDAS SQL作業可以通過CTAS/CDAS作業產生功能一鍵轉換為YAML作業。
功能:通過YAML作業的方式實現將資料從源端同步到目標端。
YAML作業優勢:不僅覆蓋CTAS和CDAS的關鍵能力(如整庫同步、單表同步、分庫分表同步、新增表同步、表結構變更和自訂計算列同步等),還支援表結構變更立即同步、原始Binlog同步、Where條件過濾、列裁剪等能力。
可以參考資料攝入YAML最佳實務瞭解更多案例。
背景資訊
CDAS是CTAS文法的一個文法糖,用於實現整庫同步、多表同步的功能,常用於全自動化的Data Integration情境。CDAS通常會配合資料來源的Catalog和目標的Catalog一起使用,通過Catalog為表提供持久化中繼資料管理能力,最終完成全量和增量資料同步,包括未來的資料變更和表結構變更,無需提前在目標庫建立表。
文法簡化
Flink會將CDAS語句中每個需要同步的表翻譯成一個對應的CTAS語句,繼承CTAS的資料同步和表結構變更同步的能力。
資源最佳化
阿里雲Flink還對源表進行最佳化,複用一個源表節點讀取多業務表的資料。在MySQL CDC資料來源情境中,不僅可以減少資料庫的串連數,還能避免重複拉取Binlog資料,降低資料庫的讀取壓力。
核心能力
資料同步
功能 | 詳情 |
支援即時同步整庫(或者多張表)的全量和增量資料到每張對應的結果表中。 | |
支援使用Regex定義庫名,匹配資料來源的多個分庫下的源表,合并後同步到下遊每張對應表名的結果表中。 | |
CDAS作業啟動後,如果源庫新增表,支援從作業快照重啟,從而捕獲到新的表,對新增表進行資料同步。 | |
支援使用STATEMENT SET文法將多個CDAS和CTAS語句作為一個作業一起提交,並支援對源表節點的合并複用,降低對資料來源的壓力。 |
表結構變更同步
在即時同步整庫資料的同時,還支援將每張源表的表結構變更(加列等)即時同步到結果表中。CDAS的表結構變更同步策略與CTAS一致,詳情請參見表結構變更同步。
啟動流程
以通過CDAS同步MySQL到Hologres為例,具體流程如下所示。
流程圖 | 啟動流程 |
當執行CDAS語句時,將會按照以下流程執行:
|
前提條件
執行CDAS文法前,確保工作空間中登入目標端的Catalog,詳情請參見資料管理。
使用限制
文法限制
不支援調試功能。
不支援MiniBatch配置。
重要建立SQL作業前:請確保組態管理頁面的作業預設配置頁簽中的其他配置處刪除了MiniBatch配置。
建立SQL作業後:具體解決方案可參見報錯:Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax。
上下遊儲存相容性
CDAS支援的上下遊儲存列表如下,您可以從下表的源表和結果表中各選一個進行組合。
連接器名稱 | 源表 | 結果表 | 備忘 |
√ | × | 不支援同步MySQL視圖。 | |
√ | × | 無。 | |
√ | × |
| |
× | √ | 無。 | |
× | √ | 如果下遊是Hologres,CDAS在預設情況下會為每個表建立相應數量(connectionSize參數值)個串連。此時您可以使用connectionPoolName參數,讓配置相同名稱串連池的表可以共用串連池。 說明
| |
× | √ | 僅支援EMR的StarRocks。 | |
× | √ | 僅Realtime Compute引擎VVR 11.1及以上版本支援同步到Paimon DLF 2.5結果表。 |
注意事項
新增表資料同步:
VVR 8.0.6及以上版本:CDAS作業啟動後,支援添加新表後從作業快照重啟,從而捕獲到新的表。詳情請參見源庫新增表同步。
VVR 8.0.5及以下版本:CDAS作業啟動後,作業同步的表已經確定,資料庫中新增的表不會自動貼齊,也無法通過重啟作業的方式捕獲到。如果需要同步新增的表,您可以選擇以下任一種方案。
方案
說明
新增作業:同步新增表
原有CDAS作業不變,啟動一個新的作業同步新增的表。新增作業樣本如下。
// 建立CTAS作業同步新增加的表new_table CREATE TABLE IF NOT EXISTS new_table AS TABLE mysql.tpcds.new_table /*+ OPTIONS('server-id'='8008-8010') */;原有作業:清理資料重新啟動
停止現有的CDAS作業。
清理已同步的資料。
以無狀態啟動CDAS作業重新同步資料。
讀寫上下遊資源許可權:
執行CDAS文法前,如果您需要訪問不同帳號下的上下遊資源、或使用RAM使用者或RAM角色等身份訪問時,請確保登入Realtime Compute開發控制台的帳號具有讀寫上下遊資源的許可權,否則會因為許可權不足導致讀寫操作失敗。
基本文法
CREATE DATABASE IF NOT EXISTS <target_database>
[COMMENT database_comment]
[WITH (key1=val1, key2=val2, ...)]
AS DATABASE <source_database>
INCLUDING { ALL TABLES | TABLE 'table_name' }
[EXCLUDING TABLE 'table_name']
[/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
<target_database>:
[catalog_name.]db_name
<source_database>:
[catalog_name.]db_nameCDAS文法複用了CREATE DATABASE文法的基本結構,其中的參數解釋如下表所示。
參數 | 說明 |
target_database | 資料同步的目標資料庫名,可以指定具體的Catalog名稱。 |
COMMENT | 目標庫的描述,預設使用source_database的描述。 |
WITH | 目標庫的參數,詳情請參見資料管理中對應的Catalog文檔。 說明 key和value都需要為字串類型,例如'sink.parallelism' = '4'。 |
source_database | 資料同步的源庫名稱,可以指定具體的Catalog名稱。 |
INCLUDING ALL TABLES | 同步源庫中的所有表。 |
INCLUDING TABLE | 同步源庫中指定的表。支援使用豎線(|)分隔指定多個表,也可以使用Regex指定符合某一規則的表。例如INCLUDING TABLE 'web.*'表示要同步源庫中所有web開頭的表。 |
EXCLUDING TABLE | 用於指定不需要同步的表。支援使用豎線(|)分隔指定多個表,也可以使用Regex指定符合某一規則的表。例如INCLUDING ALL TABLES EXCLUDING TABLE 'web.*'表示同步源庫中所有不是web開頭的表。 |
OPTIONS | 源表的參數,詳情請參見對應連接器支援的源表WITH參數。 說明 key和value都需要為字串類型,例如'server-id' = '65500'。 |
必須使用IF NOT EXISTS關鍵字,如果目標庫或結果表在目標儲存中並不存在,則會先建立該目標庫和結果表,否則跳過建立步驟。
建立的結果表Schema會使用源表的Schema,包括主鍵以及物理欄位的欄位名和欄位類型,不包括計算資料行、meta欄位、Watermark。
源表到結果表的欄位類型會經過類型映射,詳見對應連接器文檔中的類型映射。
程式碼範例
整庫同步
同步情境:將MySQL中的tpcds庫下的所有表同步到Hologres。
前提條件:已在工作空間中註冊以下Catalog。
Hologres Catalog:名稱為holo。
MySQL Catalog:名稱為mysql。
程式碼範例:
USE CATALOG holo;
CREATE DATABASE IF NOT EXISTS holo_tpcds -- 在hologres中建立holo_tpcds庫。
WITH ('sink.parallelism' = '4') -- 可選,指定目標庫的參數,每個holo sink預設使用4並發。
AS DATABASE mysql.tpcds INCLUDING ALL TABLES -- 同步mysql中tpcds庫下所有表。
/*+ OPTIONS('server-id'='8001-8004') */ ; -- 可選,指定mysql-cdc源表的額外參數。Hologres支援在建立目標Database時指定WITH參數,這些參數僅對當前作業生效,用於控制寫入結果表時的行為,不會持久化到Hologres中。支援的WITH參數詳情請參見即時數倉Hologres。
分庫合并同步
同步情境:MySQL執行個體中有order_db01~order_db99多個分庫,每個分庫下都有order、order_detail等多張表。使用CDAS將MySQL的多個分庫下的所有表全部同步到Hologres中,包括未來的資料變更和表結構變更。
同步方案:利用Regex的庫名(`order_db[0-9]+`)來匹配所要同步的多個分庫(order_db01~order_db99)。其中庫名和表名會作為額外的兩個欄位寫入到每張結果表中。為保證主鍵唯一性,庫名、表名和原主鍵一起作為對應Hologres表的新聯合主鍵。
程式碼範例及合并效果:
使用CDAS可以將上遊多個分庫下相同表名的資料合併同步到Hologres目標庫對應表名的同一張表中,無需提前在Hologres中建立表。
程式碼範例 | 合并效果 |
|
|
源庫新增表同步
同步情境:CDAS作業啟動後,需要對源庫新增表進行資料同步。
同步方案:在SQL作業中開啟新增表讀取功能,從作業快照重啟,從而捕獲到新的表,對新增表進行資料同步。
使用限制:VVR 8.0.6及以上版本支援源庫新增表同步功能,啟用該功能需確保源表啟動模式為initial。
操作步驟:
當出現新增的表需要同步時,在作業營運頁面停止作業並勾選停止前建立一次快照。
在SQL作業開發中開啟新增表讀取功能,然後重新部署作業。
在SQL作業中增加以下語句,開啟CDAS新增表讀取功能。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';單擊部署。
從快照恢複作業。
在作業營運頁面單擊目標作業名稱,狀態集管理頁簽,單擊歷史。
在作業快照列表中,找到停止作業時建立的快照。
單擊目標快照操作列,選擇完成作業啟動。詳情請參見作業啟動。
多CDAS&CTAS語句
同步情境:通過一個作業將MySQL執行個體中tpcds、tpch、user_db01~user_db99(分庫分表)多個庫同步到Hologres。
同步方案:使用STATEMENT SET文法組合多條CDAS和CTAS語句作為一個作業提交。該方案可以複用一個Source節點讀取所需表的資料,在MySQL CDC資料來源情境可以減少server-id的使用及資料庫的串連數與讀取壓力。
Source表的options完全一致才能合并成功達到Source複用最佳化的目的。
MySQL連接器中Server ID的設定,請參見設定Server ID,避免Binlog消費衝突。
程式碼範例:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- 同步user分庫分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`user_db[0-9]+`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
-- 同步TPCDS庫。
CREATE DATABASE IF NOT EXISTS holo_tpcds
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- 同步TPCH庫。
CREATE DATABASE IF NOT EXISTS holo_tpch
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;多CDAS語句整庫同步到Kafka
同步情境:將MySQL執行個體中tpcds、tpch多個庫和表都同步到Kafka。
同步方案:在使用多個CDAS語句整庫同步到Kafka時,由於不同的資料庫中可能存在相同的表,為了防止topic衝突,需要使用cdas.topic.pattern配置。
cdas.topic.pattern定義了建立topic的名稱的格式,其中可通過{table-name}預留位置來替換為表名。例如:當設定'cdas.topic.pattern'='dbname-{table-name}',對於上遊表名為table1的表,在Kafka中對應的topic名稱為dbname-table1。
程式碼範例:
USE CATALOG kafkaCatalog;
BEGIN STATEMENT SET;
-- 同步TPCDS庫。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpcds-{table-name}')
AS DATABASE mysql.tpcds INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
-- 同步TPCH庫。
CREATE DATABASE IF NOT EXISTS kafka
WITH ('cdas.topic.pattern' = 'tpch-{table-name}')
AS DATABASE mysql.tpch INCLUDING ALL TABLES
/*+ OPTIONS('server-id'='8001-8004') */ ;
END;Realtime ComputeFlink版提供MySQL整庫同步到Kafka的能力,通過引入Kafka作為中介層,並使用CDAS整庫同步或CTAS整表同步到Kafka來解決,具體操作請參見MySQL整庫同步Kafka。
常見問題
作業運行異常
作業效能問題
資料同步問題
相關文檔
CTAS和CDAS需要配合Catalog一起使用,通過Catalog為表提供持久化中繼資料管理能力,解決CTAS和CDAS無法持久化表結構和跨作業訪問的問題。常用的Catalog使用請參見:
CTAS和CDAS的使用及實踐情境:
單表同步、分庫分表合并同步或自訂計算列同步:CREATE TABLE AS(CTAS)語句。
將MySQL整庫同步到Kafka(降低多個任務對MySQL資料庫的壓力):MySQL整庫同步Kafka。
使用CTAS和CDAS實現資料同步的教程:資料庫即時入倉快速入門、基於Flink+Hologres搭建即時數倉或基於Flink+Paimon+StarRocks搭建流式湖倉。
通過YAML作業實現資料同步:
快速入門:資料攝入YAML作業快速入門。
將CTAS/CDAS作業轉化為YAML作業:建立資料攝入作業。
