全部產品
Search
文件中心

MaxCompute:CDC(邀測)

更新時間:Dec 16, 2025

CDC(Change Data Capture)定義了識別並捕獲資料庫表中資料的變更情境,用於記錄Delta Table增量錶行層級的插入、更新和刪除等操作,從而可以有效捕捉該表的資料變化事件流,後續可以通過事件驅動輔助增量計算、資料同步、數倉分層等業務需求。

功能介紹

  • 增量計算:可增量讀取CDC變更記錄進行增量計算,如增量物化視圖更新等。

  • 流式計算:可流式讀取CDC變更記錄進行流式計算,如Flink計算任務消費。

  • 多引擎間的資料同步:不同引擎間的增量資料同步和計算。

  • 日誌審計:詳細記錄所有的資料操作記錄,用於動作記錄審計。

該功能目前處於邀測中,邀測功能使用方法請參見使用說明

建表DDL

建表DDL分為兩種:同步CDC和非同步CDC。

  • 同步CDC:僅支援SQL DML操作產生增量表,不支援即時資料寫入。SQL操作完成,CDC資料即產生。

  • 非同步CDC:支援SQL DML操作產生增量表,支援即時資料寫入。但CDC資料產生屬於非同步行為,非立即生效。

同步CDC

建立Delta Table時添加"acid.cdc.mode.enable"="true"屬性, 可以選擇性添加"cdc.insert.into.passthrough.enable"="true""cdc.data.retain.hours"="24"屬性。

SQL樣本如下:

CREATE TABLE acid_with_cdc_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT) 
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");
說明
  • acid.cdc.mode.enable:開啟Delta Table CDC功能,預設為非同步方式,支援在SQL DML時同步產生CDC資料,不支援Tunnel即時寫入。

  • cdc.insert.into.passthrough.enable:對於開啟CDC功能的表預設不支援INSERT INTO,添加該屬性時,可支援執行INSERT INTO語句,寫入的資料在CDC中僅代表INSERT類型,如果存在相同PK的資料行,後續查詢會因為PK衝突導致查詢失敗,需要使用者確保PK資料唯一。

  • cdc.data.retain.hours:CDC資料的保留時間(1~168小時),預設24小時。

非同步CDC

建立Delta Table時添加"acid.cdc.mode.enable"="true""acid.cdc.build.async"="true""acid.cdc.build.interval"="300"屬性。

SQL樣本如下:

CREATE TABLE acid_with_cdc_build_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT) 
tblproperties ("transactional" = "true", 
               "acid.cdc.mode.enable"="true",
               "acid.cdc.build.async"="true",
               "acid.cdc.build.interval"="300");                                                                           
說明
  • acid.cdc.mode.enable:開啟Delta Table CDC功能,預設為非同步方式,支援在SQL DML時同步產生CDC資料,不支援Tunnel即時寫入。

  • acid.cdc.build.async:開啟非同步構建CDC的功能,支援Tunnel即時寫入該表,同時對於SQL的DML也會非同步產生。

  • acid.cdc.build.interval:非同步構建的周期配置[60-3540],單位秒,結合業務或增量情境配置該參數。

  • 其他選擇性參數(Project級 / Session級):

    "odps.storage.orc.enable.memcmp.sort.key="true",建議Project層級開啟,對於CDC的非同步構建以及查詢都會有效能協助。

CDC查詢

table_changes

  • 文法

    SELECT * FROM table_changes('<table_name>', <start> [, <end>]);
  • 參數說明

    參數名稱

    是否必填

    參數說明

    table_name

    指定查詢的Delta Table。

    start

    BIGINT或者STRING類型,常量。指定對該表CDC資料查詢的起始version,table version可以通過SHOW HISTORY FOR TABLE <table_name>;命令查詢,詳見SHOW。如果輸入為STRING類型,格式為yyyy-mm-dd hh:mi:ss

    end

    BIGINT或者STRING類型,常量。指定對該表CDC資料查詢的截止version,若不填,則會查詢到最新version。如果輸入為STRING類型,格式為yyyy-mm-dd hh:mi:ss

  • 傳回值說明

    除資料列外,會額外輸出如下三個系統列:

    • __meta_timestamp:代表資料寫入系統時間。

    • __meta_op_type:包含1(INSERT)和0(DELETE)。

    • __meta_is_update:包含1(TRUE)和0(FALSE)。

    __meta_op_type__meta_is_update可組合成如下四種情況:

    __meta_op_type

    __meta_is_update

    說明

    1

    0

    代表INSERT的新記錄。

    1

    1

    代表UPDATE後的值。

    0

    1

    代表UPDATE前的值。

    0

    0

    代表刪除。

  • 樣本

    1. 建立acid_cdc_table表 。

      CREATE TABLE acid_cdc_table(id1 STRING NOT NULL, id2 STRING NOT NULL, key1 BIGINT, key2 BIGINT, PRIMARY KEY(id1, id2))
      tblproperties("transactional" = "true", "acid.cdc.mode.enable"="true");
    2. 表中插入資料。

      -- 插入資料時間2025-04-07 11:56:57
      INSERT INTO acid_cdc_table VALUES ('1', '1006', 1006, 1006);
      -- 插入資料時間2025-04-07 12:15:00
      INSERT INTO acid_cdc_table VALUES ('1', '1008', 1008, 1008);
      -- 插入資料時間2025-04-07 13:24:00
      INSERT INTO acid_cdc_table VALUES ('1', '1032', 1032, 1032);
      -- 插入資料時間2025-04-07 14:00:00
      INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);
      -- 插入資料時間2025-04-07 14:47:00
      INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);
    3. 查詢table version

      SHOW HISTORY FOR TABLE acid_cdc_table;

      返回結果如下。

      ObjectType      ObjectId                                ObjectName              VERSION(LSN)            Time                    Operation       
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000001        2025-04-07 11:55:59     CREATE          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000002        2025-04-07 11:56:57     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000003        2025-04-07 12:00:13     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000004        2025-04-07 12:15:32     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000005        2025-04-07 12:30:02     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000006        2025-04-07 13:24:47     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000007        2025-04-07 13:30:02     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000008        2025-04-07 14:00:41     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000009        2025-04-07 14:15:15     MINOR_COMPACT   
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000010        2025-04-07 14:47:46     APPEND          
      TABLE           a4a78d3f6af04d85a57a90deff884021        acid_cdc_table          0000000000000011        2025-04-07 15:00:11     MINOR_COMPACT  
    4. 查詢CDC記錄。

      • 查詢2025-04-07 12:00:00以來記錄。

        SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00');
        -- 等同於 
        SELECT * FROM table_changes('acid_cdc_table', 3);

        返回結果如下。

        +------------+------------+------------+------------+------------------+----------------+------------------+
        | id1        | id2        | key1       | key2       | __meta_timestamp | __meta_op_type | __meta_is_update |
        +------------+------------+------------+------------+------------------+----------------+------------------+
        | 1          | 1045       | 1045       | 1045       | 2025-04-07 14:00:34 | 1              | 0                |
        | 1          | 1008       | 1008       | 1008       | 2025-04-07 12:15:28 | 1              | 0                |
        | 1          | 1032       | 1032       | 1032       | 2025-04-07 13:24:43 | 1              | 0                |
        | 2          | 1045       | 1045       | 1045       | 2025-04-07 14:47:41 | 1              | 0                |
        +------------+------------+------------+------------+------------------+----------------+------------------+
      • 查詢指定區間2025-04-07 12:00:0013:30:00的記錄。

        SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00', '2025-04-07 13:30:00');
        -- 等同於 
        SELECT * FROM table_changes('acid_cdc_table', 3, 6);

        返回結果如下。

        +------------+------------+------------+------------+------------------+----------------+------------------+
        | id1        | id2        | key1       | key2       | __meta_timestamp | __meta_op_type | __meta_is_update |
        +------------+------------+------------+------------+------------------+----------------+------------------+
        | 1          | 1008       | 1008       | 1008       | 2025-04-07 12:15:28 | 1              | 0                |
        | 1          | 1032       | 1032       | 1032       | 2025-04-07 13:24:43 | 1              | 0                |
        +------------+------------+------------+------------+------------------+----------------+------------------+

Stream

指定對Delta Table CDC資料配合Stream使用,詳細使用方式請參見流對象(Stream),此處僅提供基本文法和樣本。

  • 文法

    CREATE STREAM [IF NOT EXISTS] <stream_name> 
    ON TABLE <delta_table_name> <VERSION as of v>
    strmproperties ("read_mode"="cdc") 
    說明

    "read_mode"指定對Delta Table的消費模式,"cdc"表示該Stream會根據查詢範圍查詢CDC資料。

  • 樣本

    -- 建立源表acid_with_cdc_stream
    CREATE TABLE acid_with_cdc_stream (id1 BIGINT NOT NULL PRIMARY KEY, id2 BIGINT) 
    tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true","cdc.insert.into.passthrough.enable"="true");
    
    -- 插入資料
    INSERT INTO acid_with_cdc_stream VALUES (1, 1006), (2, 1008), (3, 1032);
    
    -- 建立一個Stream關聯acid_with_cdc_stream表
    CREATE STREAM delta_table_stream ON TABLE acid_with_cdc_stream version AS OF 1 strmproperties ("read_mode"="cdc");
    
    -- 查詢Stream對象delta_table_stream
    DESC STREAM delta_table_stream;

    返回結果樣本如下:

    Name                                    delta_table_stream                      
    Project                                 yunqi_y****                             
    Schema                                  default                                 
    Create Time                             2024-12-03 11:13:12                     Last Modified Time                      2024-12-03 11:13:12                     
    Offset Version                          1                                       
    Reference Table Project                 yunqi_y****                             
    Reference Table Schema                  default                                 
    Reference Table Name                    acid_with_cdc_stream                       
    Reference Table Id                      b89ec113f50944d5b8e52ce6a00c****        
    Reference Table Version                 2                                       
    Parameters                              {"read_mode": "cdc"}