全部產品
Search
文件中心

Realtime Compute for Apache Flink:CREATE TABLE AS(CTAS)語句

更新時間:Dec 18, 2025

CTAS支援即時同步資料及將上遊表結構(Schema)變更同步至下遊表,提升目標表建立與源表Schema變更的維護效率。本文為您介紹CTAS用法及實踐情境。

說明

建議使用資料攝入YAML作業完成資料攝入作業邏輯開發,已有的CTAS/CDAS SQL作業可以通過CTAS/CDAS作業產生功能一鍵轉換為YAML作業。

  • 功能:通過YAML作業的方式實現將資料從源端同步到目標端。

  • YAML作業優勢:不僅覆蓋CTAS和CDAS的關鍵能力(如整庫同步、單表同步、分庫分表同步、新增表同步、表結構變更和自訂計算列同步等),還支援表結構變更立即同步、原始Binlog同步、Where條件過濾、列裁剪等能力。

可以參考Flink CDC資料攝入最佳實務瞭解更多案例。

核心功能

資料同步

功能

詳情

單表同步

支援即時同步源表的全量和增量資料到結果表中。(樣本:單表同步

分庫分表合并同步

支援使用Regex定義庫名和表名,匹配資料來源的多張分庫分表,合并後同步到下遊的一張表中。(樣本:分庫分表合并同步

說明

正則匹配時,不支援使用^進行表開頭的匹配。

自訂計算列同步

支援在源表中添加計算資料行,用於對特定列進行轉換計算。計算資料行可使用系統或自訂函數,並允許指定其位置。新增列將作為結果表的物理列,即時同步計算結果。(樣本:自訂計算列同步

多CTAS語句

表結構變更同步

通過CTAS語句,在即時同步資料的同時,還能將源表Schema的變更同步到結果表中。Schema變更包括初始的表建立以及未來的表變更。

  • 支援同步的Schema變更

    Schema變更

    說明

    添加可空列

    自動在結果表Schema末尾添加對應的列並同步資料。新增的列會預設設定為可空列,變更前該列的資料自動化佈建為NULL值。

    添加非空列

    自動在結果表Schema末尾添加對應的列並同步資料。

    刪除可空列

    不會直接在結果表中刪除該列,而是將該列資料自動化佈建為NULL值。

    重新命名列

    等同於添加新列並刪除舊列,即在結果表Schema末尾添加重新命名後的列,並將重新命名前的列資料自動化佈建為NULL值。

    說明

    例如,如果col_a被重新命名為col_b,則會在結果表末尾添加col_b,並自動將col_a的資料填充為NULL值。

    列類型變更

    • 下遊系統支援列類型變更:目前只有Paimon支援處理列類型變更。CTAS支援普通列的類型變更,例如,從INT類型變更到BIGINT類型。

      此類變更依賴於下遊Sink支援的列類型變更規則,請參考對應結果表文檔擷取其支援的列類型變更規則。

    • 下遊系統不支援列類型變更:目前只有Hologres支援寬容模式處理列類型變更,即CTAS作業啟動時建立類型更加寬泛的下遊表,通過下遊Sink對列類型變更的相容性實現列類型變更支援,具體可參見樣本:寬容模式同步資料

      重要

      寬容模式應該在初次開機CTAS作業時開啟,如果在初次開機時未開啟寬容模式,需要刪除下遊表並且將作業無狀態重啟才會生效。

    重要

    CTAS是對比前後兩條資料的Schema差異,不會去識別具體的DDL類型。

    • 刪除了某列後重新添加該列,且期間無資料變化,那麼CTAS會認為沒有發生結構變更。

    • 添加新列後且有資料變化時,CTAS才會感知到結構變更,然後同步結構變更到結果表。

  • 不支援同步的Schema變更

    • 不支援主鍵或索引等約束的變更。

    • 不支援非空列的刪除。

    • 不支援從NOT NULL轉為NULLABLE的變更。

    重要

    如果遇到以上不支援的Schema變更,需要您手動刪除下遊結果表,重新啟動CTAS作業,即重新建立結果表並重新同步歷史資料。

啟動流程

以通過CTAS同步MySQL資料至Hologres為例,具體流程如下所示。

流程圖

啟動流程

image

當執行CTAS語句時,將會按照以下流程執行:

  1. 檢查目標儲存中是否存在該結果表。

    • 結果表不存在:通過目標端Catalog去目標儲存中建立相應的結果表,該結果表具有和資料來源相同的Schema。

    • 結果表存在:跳過建表的流程,檢測結果表與源表Schema是否一致。如果不一致則會報錯。

  2. 提交和啟動相應的資料同步作業。

    將資料來源的資料以及Schema的變更同步到結果表中。

前提條件

執行CTAS文法前,確保工作空間中登入目標端的Catalog。詳情請參見資料管理

使用限制

文法限制

上下遊儲存相容性

CTAS支援的上下遊儲存列表如下,您可以從下表的源表和結果表中各選一個進行組合。

連接器名稱

源表

結果表

備忘

MySQL

×

  • 分庫分表合并同步時,預設會同步上遊儲存的資料庫名稱和表名稱。

  • 單表同步時,不會同步資料庫名稱和表名稱。如果您需要同步資料庫名稱和表名稱,請使用SQL命令建立Catalog,並添加catalog.table.metadata-columns參數。詳情請參見管理MySQL Catalog

  • 不支援同步MySQL視圖。

訊息佇列Kafka

×

無。

MongoDB

×

  • 暫不支援分庫分表合并同步。

  • 暫不支援同步MongoDB元資訊。

  • 暫不支援CTAS新增表功能。

  • 支援通過CTAS語句將MongoDB中的資料及表結構變更同步至目標表,樣本可參考將MongoDB源表同步到Hologres表

Upsert Kafka

×

無。

StarRocks

×

僅支援EMR的StarRocks。

即時數倉Hologres

×

如果下遊是Hologres,CTAS在預設情況下會為每個表建立相應數量(connectionSize參數值)個串連。此時您就可以使用connectionPoolName參數,讓配置相同名稱串連池的表可以共用串連池。

說明

在將資料同步到Hologres時,如果您的上遊源表包含了Fixed Plan不支援類型的資料,建議通過INSERT INTO語句的方式,在Flink內部做類型轉換後將資料同步到Hologres。不要用CTAS方式建立Sink結果表進行資料同步,因為這種方式會無法走Fixed Plan,寫入效能較差。

流式資料湖倉Paimon

×

僅Realtime Compute引擎VVR 11.1及以上版本支援同步到Paimon DLF 2.5結果表。

基本文法

CREATE TABLE IF NOT EXISTS <sink_table>
(
  [ <table_constraint> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (
  key1=val1,
  key2=val2, 
  ...
 )
AS TABLE <source_table> [/*+ OPTIONS(key1=val1, key2=val2, ... ) */]
[ADD COLUMN { <column_component> | (<column_component> [, ...])}];

<sink_table>:
  [catalog_name.][db_name.]table_name

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<source_table>:
  [catalog_name.][db_name.]table_name

<column_component>:
  column_name AS computed_column_expression [COMMENT column_comment] [FIRST | AFTER column_name]

CTAS文法複用了CREATE TABLE文法的基本結構,其中的參數解釋如下表所示。

參數

說明

sink_table

資料同步的結果表名,可以指定具體的Catalog名稱和資料庫名稱。

COMMENT

結果表的描述,預設使用source_table的描述。

PARTITIONED BY

系統支援根據某列進行分區,建立分區表。

重要

暫不支援同步到StarRocks分區表。

table_constraint

定義表主鍵約束,用於確保資料唯一性。

WITH

結果表參數,可填入結果表支援的WITH參數。支援的WITH參數詳情請參見Upsert Kafka WITH參數Hologres WITH參數StarRocks WITH參數Paimon WITH參數

說明

key和value都需要為字串類型,例如'jdbcWriteBatchSize' = '1024'

source_table

資料同步的源表表名,可指定具體的Catalog名稱和Database名稱。

OPTIONS

源表的參數,可填入源表支援的WITH參數。支援的WITH參數詳情請參見MySQL WITH參數Kafka WITH參數

說明

key和value都需要為字串類型,例如'server-id' = '65500'

ADD COLUMN

定義目標表相對於源表新增或重新命名的欄位,支援欄位別名與計算資料行。

重要

純欄位對應(如col AS new_col)可能因最佳化被忽略;建議通過添加零計算運算式(如 col AS new_col + INTERVAL '0' SECOND)確保欄位穩定生效。

column_component

新增列的描述。

computed_column_expression

計算資料行運算式的描述。

FIRST

新增列作為源表的第一個欄位。如果不添加該參數,則新增列會預設作為源表的最後一個欄位。

AFTER

新增列放在源表指定欄位後面。

說明
  • IF NOT EXISTS關鍵字為必填,如果結果表在目標儲存中並不存在,則會先建立該結果表,否則跳過建立步驟。

  • 建立的結果表Schema會使用源表的Schema,包括主鍵以及物理欄位的欄位名和欄位類型,不包括計算資料行、meta欄位、Watermark。

  • 源表到結果表的欄位類型會經過類型映射,詳情請參見對應連接器文檔中的類型映射。

程式碼範例

單表同步

同步情境:將MySQL中的web_sales表同步到Hologres。

前提條件:已在工作空間註冊以下Catalog。

  • Hologres Catalog:名稱為holo。

  • MySQL Catalog:名稱為mysql。

程式碼範例

CTAS通常會配合資料來源的Catalog和目標的Catalog一起使用,其中源Catalog可以自動解析源表的Schema及參數(無需手動編寫DDL),最終完成源表到目標表的全量和增量資料同步。

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS web_sales   -- 沒有指定資料庫,則同步到預設資料庫的web_sales表。
WITH ('jdbcWriteBatchSize' = '1024')   -- 可選,指定結果表的參數。
AS TABLE mysql.tpcds.web_sales   
/*+ OPTIONS('server-id'='8001-8004') */;  -- 指定mysql-cdc源表的額外參數。

分庫分表合并同步

同步情境:使用CTAS將MySQL多張分庫分表合并到一張Hologres表中。

同步方案:結合MySQL Catalog,利用Regex的庫名和表名匹配要同步的多張表。其中庫名和表名會作為額外的兩個欄位寫入到結果表中,為保證主鍵唯一性,庫名、表名和原主鍵會一起作為該Hologres表的新聯合主鍵。

程式碼範例及合并效果

程式碼範例

合并效果

分庫分表合并同步情境

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`  
/*+ OPTIONS('server-id'='8001-8004') */;

效果

源表結構變更情境:在user02表中新增一列age,並插入一條資料。(雖然多張分表的Schema並不一致,但是user02表後續的資料和Schema變更都能即時地自動同步到下遊表中。)

ALTER TABLE `user02` ADD COLUMN `age` INT;
INSERT INTO `user02` (id, name, age) VALUES (27, 'Tony', 30);

image

自訂計算列同步

同步情境:在MySQL分庫分表合并過程中新增自訂計算列同步到Hologres表中。

程式碼範例及合并效果

程式碼範例

合并效果

USE CATALOG holo;

CREATE TABLE IF NOT EXISTS user
WITH ('jdbcWriteBatchSize' = '1024')
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */
ADD COLUMN (
  `c_id` AS `id` + 10 AFTER `id`,
  `calss` AS 3  AFTER `id`
);

image

多CTAS語句:作為一個作業提交

同步情境:將MySQL中的web_sales表、user分庫分表作為一個作業同步到Hologres。

同步方案:使用STATEMENT SET文法將多個CTAS語句作為一個作業提交。該方案可以複用一個Source節點讀取多張業務表的資料,在MySQL CDC資料來源情境可以減少server-id的使用及資料庫的串連數與讀取壓力。

重要

程式碼範例

USE CATALOG holo;

BEGIN STATEMENT SET;

-- 同步web_sales表。
CREATE TABLE IF NOT EXISTS web_sales
AS TABLE mysql.tpcds.web_sales
/*+ OPTIONS('server-id'='8001-8004') */;

-- 同步user分庫分表。
CREATE TABLE IF NOT EXISTS user
AS TABLE mysql.`wp.*`.`user[0-9]+`
/*+ OPTIONS('server-id'='8001-8004') */;

END;

多CTAS語句:同一源表同步到多結果表

  • 結果表不添加計算資料行

    USE CATALOG `holo`;
    
    BEGIN STATEMENT SET;
    
    -- 通過CTAS語句同步MySQL的user表到Holo數倉database1的user表中
    CREATE TABLE IF NOT EXISTS `database1`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- 通過CTAS語句同步MySQL的user表到Holo數倉database2的user表中
    CREATE TABLE IF NOT EXISTS `database2`.`user`
    AS TABLE `mysql`.`tpcds`.`user`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;
  • 結果表需要添加計算資料行

    -- 基於源表user建立暫存資料表user_with_changed_id,支援定義計算資料行,例如這裡的computed_id是基於源表的id計算獲得。
    CREATE TEMPORARY TABLE `user_with_changed_id` (
      `computed_id` AS `id` + 1000
    ) LIKE `mysql`.`tpcds`.`user`;
    
    -- 基於源表user建立暫存資料表user_with_changed_age,支援定義計算資料行,例如這裡的computed_age是基於源表的age計算獲得。
    CREATE TEMPORARY TABLE `user_with_changed_age` (
      `computed_age` AS `age` + 1
    ) LIKE `mysql`.`tpcds`.`user`;
    
    BEGIN STATEMENT SET;
    
    -- 通過CTAS語句同步MySQL的user表到Holo數倉的user_with_changed_id表中,表中會包含通過計算獲得的id,即computed_id列。 
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_id`
    AS TABLE `user_with_changed_id`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    -- 通過CTAS語句同步MySQL的user表到Holo數倉的user_with_changed_age表中,表中會包含通過計算獲得的age,即computed_age列。 
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`user_with_changed_age`
    AS TABLE `user_with_changed_age`
    /*+ OPTIONS('server-id'='8001-8004') */;
    
    END;

多CTAS語句:新增表同步

同步情境:多個CTAS語句的作業啟動後,需要新增CTAS語句對新增表進行資料同步。

同步方案:在SQL作業中開啟新增表讀取功能,新增CTAS語句後從作業快照重啟,捕獲到新表後進行資料同步。

使用限制

  • VVR 8.0.1及以上版本支援新增表功能。

  • 使用CDC源表同步時,僅支援源表啟動模式為initial的作業使用新增表功能。

  • 新增CTAS語句中新增的源表配置要和原有的源表配置完全一致,確保Source能夠複用。

  • 新增CTAS語句前後,作業配置參數不能變更(例如更改啟動模式等)。

操作步驟

  1. 當需要新增CTAS語句時,在作業營運頁面停止作業並勾選停止前建立一次快照

  2. 在SQL作業中開啟新增表讀取功能並新增CTAS語句,然後重新部署作業。

    1. 在SQL作業中增加以下語句,開啟新增表讀取功能。

      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
    2. 在SQL作業中增加CTAS語句,最終完整的程式碼範例如下。

      -- 開啟新增表讀取功能。
      SET 'table.cdas.scan.newly-added-table.enabled' = 'true';
      
      USE CATALOG holo;
      
      BEGIN STATEMENT SET;
      
      -- 同步web_sales表。
      CREATE TABLE IF NOT EXISTS web_sales
      AS TABLE mysql.tpcds.web_sales
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- 同步user分庫分表。
      CREATE TABLE IF NOT EXISTS user
      AS TABLE mysql.`wp.*`.`user[0-9]+`
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      -- 同步product表。(新增表)
      CREATE TABLE IF NOT EXISTS product
      AS TABLE mysql.tpcds.product
      /*+ OPTIONS('server-id'='8001-8004') */;
      
      END;
    3. 單擊部署

  3. 從快照恢複作業。

    1. 作業營運頁面單擊目標作業名稱,狀態集管理頁簽,單擊歷史

    2. 作業快照列表中,找到停止作業時建立的快照。

    3. 單擊目標快照操作列,選擇更多 > 從該快照恢複作業完成作業啟動。詳情請參見作業啟動

同步到Hologres分區表

同步情境:通過CTAS語句將MySQL源表同步到Hologres分區表。

Hologres分區表規則:在Hologres中,如果目標表定義了主鍵,則分區欄位必須包含在主鍵列中。

程式碼範例

MySQL源表的建表語句如下:

CREATE TABLE orders (
    order_id INTEGER NOT NULL,
    product_id INTEGER NOT NULL,
    city VARCHAR(100) NOT NULL
    order_date DATE,
    purchaser INTEGER,
    PRIMARY KEY(order_id, product_id)
);

需要根據源表的主鍵與分區欄位的關係採取不同的處理方式。

  • 源表主鍵包含分區欄位:直接通過CTAS陳述式完成同步。

    Hologres會自動驗證分區欄位是否為主鍵的一部分。

    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`
    PARTITIONED BY (product_id)
    AS TABLE `mysql`.`tpcds`.`orders`;
  • 源表主鍵不包含分區欄位:在CTAS語句中重新聲明目標表的主鍵。

    如果Hologres表的分區欄位(如city)不在上遊表主鍵中,則直接同步處理會導致作業失敗。您需要在CTAS語句中重新聲明目標表的主鍵,確保分區欄位成為主鍵的一部分。

    -- 可以通過如下SQL指定Hologres分區表的主鍵為order_id,product_id和city。
    CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders`(
        CONSTRAINT `PK_order_id_city` PRIMARY KEY (`order_id`,`product_id`,`city`) NOT ENFORCED
    )
    PARTITIONED BY (city)
    AS TABLE `mysql`.`tpcds`.`orders`;

寬容模式同步資料

同步情境:使用CTAS語句同步資料到Hologres表時,需要支援調整已有欄位資料類型的精度(如從VARCHAR(10)修改為VARCHAR(20))或者修改資料類型(如從SMALLINT修改為INT)的情境。

同步方案:使用Hologres欄位類型寬容模式同步資料。寬容模式應該在CTAS作業初次開機時開啟,若未開啟,需要刪除下遊表並且將作業無狀態重啟才會生效。

類型歸一化規則

上遊資料類型修改後,若修改類型與原類型的歸一化類型相同,則視為類型修改成功,CTAS作業正常運行。否則屬於不相容的情況,CTAS作業會拋出異常。具體規則如下:

  • TINYINT、SMALLINT、INT和BIGINT歸一化為BIGINT。

  • CHAR、VARCHAR和STRING歸一化為STRING。

  • FLOAT和DOUBLE歸一化為DOUBLE。

  • 其他資料類型按照原本的類型映射規則建立,詳情參見類型映射

程式碼範例

CREATE TABLE IF NOT EXISTS `holo`.`tpcds`.`orders` 
WITH (
'connector' = 'hologres', 
'enableTypeNormalization' = 'true' -- 使用欄位類型寬容模式。
) AS TABLE `mysql`.`tpcds`.`orders`;

將MongoDB源表同步到Hologres表

使用限制

  • Realtime ComputeFlink VVR版本要求8.0.6及以上,MongoDB資料庫版本為6.0及以上。

  • 在SQL Hints中需設定參數scan.incremental.snapshot.enabledscan.full-changelog為true。

  • MongoDB資料庫需要開啟前像後像(Pre- and Post-images)記錄功能,開啟方法參見Document Preimages

  • 使用一個作業同步多個MongoDB集合時,需要滿足以下要求:

    • 每張表中MongoDB的配置必須完全相同,包括hosts、scheme、username、password、connectionOptions。

    • 每張表的scan.startup.mode配置必須完全相同。

程式碼範例

BEGIN STATEMENT SET;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table1`
AS TABLE `mongodb`.`database`.`collection1`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

CREATE TABLE IF NOT EXISTS `holo`.`database`.`table2`
AS TABLE `mongodb`.`database`.`collection2`
/*+ OPTIONS('scan.incremental.snapshot.enabled'='true','scan.full-changelog'='true') */;

END;

常見問題

作業運行異常

作業效能問題

資料同步問題

相關文檔