CTAS支援即時同步資料及將上遊表結構(Schema)變更同步至下遊表,提升目標表建立與源表Schema變更的維護效率。本文為您介紹CTAS用法及實踐情境。
建議使用資料攝入YAML作業完成資料攝入作業邏輯開發,已有的CTAS/CDAS SQL作業可以通過CTAS/CDAS作業產生功能一鍵轉換為YAML作業。
功能:通過YAML作業的方式實現將資料從源端同步到目標端。
YAML作業優勢:不僅覆蓋CTAS和CDAS的關鍵能力(如整庫同步、單表同步、分庫分表同步、新增表同步、表結構變更和自訂計算列同步等),還支援表結構變更立即同步、原始Binlog同步、Where條件過濾、列裁剪等能力。
可以參考Flink CDC資料攝入最佳實務瞭解更多案例。
核心功能
資料同步
功能 | 詳情 |
單表同步 | 支援即時同步源表的全量和增量資料到結果表中。(樣本:單表同步) |
分庫分表合并同步 | 支援使用Regex定義庫名和表名,匹配資料來源的多張分庫分表,合并後同步到下遊的一張表中。(樣本:分庫分表合并同步) 說明 正則匹配時,不支援使用^進行表開頭的匹配。 |
自訂計算列同步 | 支援在源表中添加計算資料行,用於對特定列進行轉換計算。計算資料行可使用系統或自訂函數,並允許指定其位置。新增列將作為結果表的物理列,即時同步計算結果。(樣本:自訂計算列同步) |
多CTAS語句 |
|
表結構變更同步
通過CTAS語句,在即時同步資料的同時,還能將源表Schema的變更同步到結果表中。Schema變更包括初始的表建立以及未來的表變更。
支援同步的Schema變更
Schema變更
說明
添加可空列
自動在結果表Schema末尾添加對應的列並同步資料。新增的列會預設設定為可空列,變更前該列的資料自動化佈建為NULL值。
添加非空列
自動在結果表Schema末尾添加對應的列並同步資料。
刪除可空列
不會直接在結果表中刪除該列,而是將該列資料自動化佈建為NULL值。
重新命名列
等同於添加新列並刪除舊列,即在結果表Schema末尾添加重新命名後的列,並將重新命名前的列資料自動化佈建為NULL值。
說明例如,如果col_a被重新命名為col_b,則會在結果表末尾添加col_b,並自動將col_a的資料填充為NULL值。
列類型變更
下遊系統支援列類型變更:目前只有Paimon支援處理列類型變更。CTAS支援普通列的類型變更,例如,從INT類型變更到BIGINT類型。
此類變更依賴於下遊Sink支援的列類型變更規則,請參考對應結果表文檔擷取其支援的列類型變更規則。
下遊系統不支援列類型變更:目前只有Hologres支援寬容模式處理列類型變更,即CTAS作業啟動時建立類型更加寬泛的下遊表,通過下遊Sink對列類型變更的相容性實現列類型變更支援,具體可參見樣本:寬容模式同步資料。
重要寬容模式應該在初次開機CTAS作業時開啟,如果在初次開機時未開啟寬容模式,需要刪除下遊表並且將作業無狀態重啟才會生效。
重要CTAS是對比前後兩條資料的Schema差異,不會去識別具體的DDL類型。
刪除了某列後重新添加該列,且期間無資料變化,那麼CTAS會認為沒有發生結構變更。
添加新列後且有資料變化時,CTAS才會感知到結構變更,然後同步結構變更到結果表。
不支援同步的Schema變更
不支援主鍵或索引等約束的變更。
不支援非空列的刪除。
不支援從NOT NULL轉為NULLABLE的變更。
重要如果遇到以上不支援的Schema變更,需要您手動刪除下遊結果表,重新啟動CTAS作業,即重新建立結果表並重新同步歷史資料。
啟動流程
以通過CTAS同步MySQL資料至Hologres為例,具體流程如下所示。
流程圖 | 啟動流程 |
當執行CTAS語句時,將會按照以下流程執行:
|
前提條件
執行CTAS文法前,確保工作空間中登入目標端的Catalog。詳情請參見資料管理。
使用限制
文法限制
不支援調試功能。
不支援與INSERT INTO語句在同一作業中混合使用。
不支援同步到StarRocks分區表。
不支援MiniBatch配置。
重要建立SQL作業前:請確保組態管理頁面的作業預設配置頁簽中的其他配置處刪除了MiniBatch配置。
建立SQL作業後:具體解決方案可參見報錯:Currently does not support merge StreamExecMiniBatchAssigner type ExecNode in CTAS/CDAS syntax。
上下遊儲存相容性
CTAS支援的上下遊儲存列表如下,您可以從下表的源表和結果表中各選一個進行組合。
連接器名稱 | 源表 | 結果表 | 備忘 |
√ | × |
| |
√ | × | 無。 | |
√ | × |
| |
× | √ | 無。 | |
× | √ | 僅支援EMR的StarRocks。 | |
× | √ | 如果下遊是Hologres,CTAS在預設情況下會為每個表建立相應數量(connectionSize參數值)個串連。此時您就可以使用connectionPoolName參數,讓配置相同名稱串連池的表可以共用串連池。 說明 在將資料同步到Hologres時,如果您的上遊源表包含了Fixed Plan不支援類型的資料,建議通過INSERT INTO語句的方式,在Flink內部做類型轉換後將資料同步到Hologres。不要用CTAS方式建立Sink結果表進行資料同步,因為這種方式會無法走Fixed Plan,寫入效能較差。 | |
× | √ | 僅Realtime Compute引擎VVR 11.1及以上版本支援同步到Paimon DLF 2.5結果表。 |
基本文法
CREATE TABLE IF NOT EXISTS <sink_table>
(
[ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
key1=val1,
key2=val2,
...
)
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];
<sink_table>:
[catalog_name.][db_name.]table_name
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<source_table>:
[catalog_name.][db_name.]table_name
<column_component>:
column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]CTAS文法複用了CREATE TABLE文法的基本結構,其中的參數解釋如下表所示。
參數 | 說明 |
| 資料同步的結果表名,可以指定具體的Catalog名稱和資料庫名稱。 |
| 結果表的描述,預設使用source_table的描述。 |
| 系統支援根據某列進行分區,建立分區表。 重要 暫不支援同步到StarRocks分區表。 |
| 定義表主鍵約束,用於確保資料唯一性。 |
| 結果表參數,可填入結果表支援的WITH參數。支援的WITH參數詳情請參見Upsert Kafka WITH參數、Hologres WITH參數、StarRocks WITH參數或Paimon WITH參數。 說明 key和value都需要為字串類型,例如 |
| 資料同步的源表表名,可指定具體的Catalog名稱和Database名稱。 |
| 源表的參數,可填入源表支援的WITH參數。支援的WITH參數詳情請參見MySQL WITH參數和Kafka WITH參數。 說明 key和value都需要為字串類型,例如'server-id' = '65500'。 |
| 定義目標表相對於源表新增或重新命名的欄位,支援欄位別名與計算資料行。 重要 純欄位對應(如 |
| 新增列的描述。 |
| 計算資料行運算式的描述。 |
| 新增列作為源表的第一個欄位。如果不添加該參數,則新增列會預設作為源表的最後一個欄位。 |
| 新增列放在源表指定欄位後面。 |
IF NOT EXISTS關鍵字為必填,如果結果表在目標儲存中並不存在,則會先建立該結果表,否則跳過建立步驟。
建立的結果表Schema會使用源表的Schema,包括主鍵以及物理欄位的欄位名和欄位類型,不包括計算資料行、meta欄位、Watermark。
源表到結果表的欄位類型會經過類型映射,詳情請參見對應連接器文檔中的類型映射。
程式碼範例
單表同步
同步情境:將MySQL中的web_sales表同步到Hologres。
前提條件:已在工作空間註冊以下Catalog。
Hologres Catalog:名稱為holo。
MySQL Catalog:名稱為mysql。
程式碼範例:
CTAS通常會配合資料來源的Catalog和目標的Catalog一起使用,其中源Catalog可以自動解析源表的Schema及參數(無需手動編寫DDL),最終完成源表到目標表的全量和增量資料同步。
USE CATALOG holo;
CREATE TABLE IF NOT EXISTS web_sales -- 沒有指定資料庫,則同步到預設資料庫的web_sales表。
WITH ('jdbcWriteBatchSize' = '1024') -- 可選,指定結果表的參數。
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */; -- 指定mysql-cdc源表的額外參數。分庫分表合并同步
同步情境:使用CTAS將MySQL多張分庫分表合并到一張Hologres表中。
同步方案:結合MySQL Catalog,利用Regex的庫名和表名匹配要同步的多張表。其中庫名和表名會作為額外的兩個欄位寫入到結果表中,為保證主鍵唯一性,庫名、表名和原主鍵會一起作為該Hologres表的新聯合主鍵。
程式碼範例及合并效果:
程式碼範例 | 合并效果 |
分庫分表合并同步情境: |
|
源表結構變更情境:在user02表中新增一列age,並插入一條資料。(雖然多張分表的Schema並不一致,但是user02表後續的資料和Schema變更都能即時地自動同步到下遊表中。) |
|
自訂計算列同步
同步情境:在MySQL分庫分表合并過程中新增自訂計算列同步到Hologres表中。
程式碼範例及合并效果:
程式碼範例 | 合并效果 |
|
|
多CTAS語句:作為一個作業提交
同步情境:將MySQL中的web_sales表、user分庫分表作為一個作業同步到Hologres。
同步方案:使用STATEMENT SET文法將多個CTAS語句作為一個作業提交。該方案可以複用一個Source節點讀取多張業務表的資料,在MySQL CDC資料來源情境可以減少server-id的使用及資料庫的串連數與讀取壓力。
Source表的options完全一致才能合并成功達到Source複用最佳化的目的。
MySQL連接器中Server ID的設定,請參見設定Server ID,避免Binlog消費衝突。
程式碼範例:
USE CATALOG holo;
BEGIN STATEMENT SET;
-- 同步web_sales表。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;
-- 同步user分庫分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;
END;多CTAS語句:同一源表同步到多結果表
結果表不添加計算資料行
USE CATALOG `holo`; BEGIN STATEMENT SET; -- 通過CTAS語句同步MySQL的user表到Holo數倉database1的user表中 CREATE TABLE IF NOT EXISTS `database1`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; -- 通過CTAS語句同步MySQL的user表到Holo數倉database2的user表中 CREATE TABLE IF NOT EXISTS `database2`.`user` AS TABLE `mysql`.`tpcds`.`user` /*+ OPTIONS('server-id'='8001-8004') */; END;結果表需要添加計算資料行
-- 基於源表user建立暫存資料表user_with_changed_id,支援定義計算資料行,例如這裡的computed_id是基於源表的id計算獲得。 CREATE TEMPORARY TABLE `user_with_changed_id` ( `computed_id` AS `id` + 1000 ) LIKE `mysql`.`tpcds`.`user`; -- 基於源表user建立暫存資料表user_with_changed_age,支援定義計算資料行,例如這裡的computed_age是基於源表的age計算獲得。 CREATE TEMPORARY TABLE `user_with_changed_age` ( `computed_age` AS `age` + 1 ) LIKE `mysql`.`tpcds`.`user`; BEGIN STATEMENT SET; -- 通過CTAS語句同步MySQL的user表到Holo數倉的user_with_changed_id表中,表中會包含通過計算獲得的id,即computed_id列。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id` AS TABLE `user_with_changed_id` /*+ OPTIONS('server-id'='8001-8004') */; -- 通過CTAS語句同步MySQL的user表到Holo數倉的user_with_changed_age表中,表中會包含通過計算獲得的age,即computed_age列。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age` AS TABLE `user_with_changed_age` /*+ OPTIONS('server-id'='8001-8004') */; END;
多CTAS語句:新增表同步
同步情境:多個CTAS語句的作業啟動後,需要新增CTAS語句對新增表進行資料同步。
同步方案:在SQL作業中開啟新增表讀取功能,新增CTAS語句後從作業快照重啟,捕獲到新表後進行資料同步。
使用限制:
VVR 8.0.1及以上版本支援新增表功能。
使用CDC源表同步時,僅支援源表啟動模式為initial的作業使用新增表功能。
新增CTAS語句中新增的源表配置要和原有的源表配置完全一致,確保Source能夠複用。
新增CTAS語句前後,作業配置參數不能變更(例如更改啟動模式等)。
操作步驟:
當需要新增CTAS語句時,在作業營運頁面停止作業並勾選停止前建立一次快照。
在SQL作業中開啟新增表讀取功能並新增CTAS語句,然後重新部署作業。
在SQL作業中增加以下語句,開啟新增表讀取功能。
SET 'table.cdas.scan.newly-added-table.enabled' = 'true';在SQL作業中增加CTAS語句,最終完整的程式碼範例如下。
-- 開啟新增表讀取功能。 SET 'table.cdas.scan.newly-added-table.enabled' = 'true'; USE CATALOG holo; BEGIN STATEMENT SET; -- 同步web_sales表。 CREATE TABLE IF NOT EXISTS web_sales AS TABLE mysql.tpcds.web_sales /*+ OPTIONS('server-id'='8001-8004') */; -- 同步user分庫分表。 CREATE TABLE IF NOT EXISTS user AS TABLE mysql.`wp.*`.`user[0-9]+` /*+ OPTIONS('server-id'='8001-8004') */; -- 同步product表。(新增表) CREATE TABLE IF NOT EXISTS product AS TABLE mysql.tpcds.product /*+ OPTIONS('server-id'='8001-8004') */; END;單擊部署。
從快照恢複作業。
在作業營運頁面單擊目標作業名稱,狀態集管理頁簽,單擊歷史。
在作業快照列表中,找到停止作業時建立的快照。
單擊目標快照操作列,選擇完成作業啟動。詳情請參見作業啟動。
同步到Hologres分區表
同步情境:通過CTAS語句將MySQL源表同步到Hologres分區表。
Hologres分區表規則:在Hologres中,如果目標表定義了主鍵,則分區欄位必須包含在主鍵列中。
程式碼範例:
MySQL源表的建表語句如下:
CREATE TABLE orders (
order_id INTEGER NOT NULL,
product_id INTEGER NOT NULL,
city VARCHAR(100) NOT NULL
order_date DATE,
purchaser INTEGER,
PRIMARY KEY(order_id, product_id)
);需要根據源表的主鍵與分區欄位的關係採取不同的處理方式。
源表主鍵包含分區欄位:直接通過CTAS陳述式完成同步。
Hologres會自動驗證分區欄位是否為主鍵的一部分。
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` PARTITIONED BY (product_id) AS TABLE `mysql`.`tpcds`.`orders`;源表主鍵不包含分區欄位:在CTAS語句中重新聲明目標表的主鍵。
如果Hologres表的分區欄位(如city)不在上遊表主鍵中,則直接同步處理會導致作業失敗。您需要在CTAS語句中重新聲明目標表的主鍵,確保分區欄位成為主鍵的一部分。
-- 可以通過如下SQL指定Hologres分區表的主鍵為order_id,product_id和city。 CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`( CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED ) PARTITIONED BY (city) AS TABLE `mysql`.`tpcds`.`orders`;
寬容模式同步資料
同步情境:使用CTAS語句同步資料到Hologres表時,需要支援調整已有欄位資料類型的精度(如從VARCHAR(10)修改為VARCHAR(20))或者修改資料類型(如從SMALLINT修改為INT)的情境。
同步方案:使用Hologres欄位類型寬容模式同步資料。寬容模式應該在CTAS作業初次開機時開啟,若未開啟,需要刪除下遊表並且將作業無狀態重啟才會生效。
類型歸一化規則:
上遊資料類型修改後,若修改類型與原類型的歸一化類型相同,則視為類型修改成功,CTAS作業正常運行。否則屬於不相容的情況,CTAS作業會拋出異常。具體規則如下:
TINYINT、SMALLINT、INT和BIGINT歸一化為BIGINT。
CHAR、VARCHAR和STRING歸一化為STRING。
FLOAT和DOUBLE歸一化為DOUBLE。
其他資料類型按照原本的類型映射規則建立,詳情參見類型映射。
程式碼範例:
CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
WITH (
'connector' = 'hologres',
'enableTypeNormalization' = 'true' -- 使用欄位類型寬容模式。
) AS TABLE `mysql`.`tpcds`.`orders`;將MongoDB源表同步到Hologres表
使用限制:
Realtime ComputeFlink VVR版本要求8.0.6及以上,MongoDB資料庫版本為6.0及以上。
在SQL Hints中需設定參數scan.incremental.snapshot.enabled和scan.full-changelog為true。
MongoDB資料庫需要開啟前像後像(Pre- and Post-images)記錄功能,開啟方法參見Document Preimages。
使用一個作業同步多個MongoDB集合時,需要滿足以下要求:
每張表中MongoDB的配置必須完全相同,包括hosts、scheme、username、password、connectionOptions。
每張表的scan.startup.mode配置必須完全相同。
程式碼範例:
BEGIN STATEMENT SET;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;
END;常見問題
作業運行異常
作業效能問題
資料同步問題
相關文檔
CTAS需要配合Catalog一起使用,通過Catalog為表提供持久化中繼資料管理能力,解決CTAS無法持久化表結構和跨作業訪問的問題。常用的Catalog使用請參見:
CTAS和CDAS的使用及實踐情境:
整庫同步、分庫合并或源庫新增表同步:CREATE DATABASE AS(CDAS)語句。
將MySQL整庫同步到Kafka(降低多個任務對MySQL資料庫的壓力):Flink CDC MySQL整庫同步Kafka。
使用CTAS和CDAS實現資料同步的教程:資料庫即時入倉快速入門、Hologres即時數倉搭建或Paimon+StarRocks流式湖倉構建。
通過YAML作業實現資料同步:
快速入門:Flink CDC資料攝入作業快速入門。
將CTAS作業轉化為YAML作業:建立Flink CDC資料攝入作業。


