CDC(Change Data Capture)定義了識別並捕獲資料庫表中資料的變更情境,用於記錄Delta Table增量錶行層級的插入、更新和刪除等操作,從而可以有效捕捉該表的資料變化事件流,後續可以通過事件驅動輔助增量計算、資料同步、數倉分層等業務需求。
功能介紹
增量計算:可增量讀取CDC變更記錄進行增量計算,如增量物化視圖更新等。
流式計算:可流式讀取CDC變更記錄進行流式計算,如Flink計算任務消費。
多引擎間的資料同步:不同引擎間的增量資料同步和計算。
日誌審計:詳細記錄所有的資料操作記錄,用於動作記錄審計。
該功能目前處於邀測中,邀測功能使用方法請參見使用說明。
建表DDL
建表DDL分為兩種:同步CDC和非同步CDC。
同步CDC:僅支援SQL DML操作產生增量表,不支援即時資料寫入。SQL操作完成,CDC資料即產生。
非同步CDC:支援SQL DML操作產生增量表,支援即時資料寫入。但CDC資料產生屬於非同步行為,非立即生效。
同步CDC
建立Delta Table時添加"acid.cdc.mode.enable"="true"屬性, 可以選擇性添加"cdc.insert.into.passthrough.enable"="true"和"cdc.data.retain.hours"="24"屬性。
SQL樣本如下:
CREATE TABLE acid_with_cdc_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");acid.cdc.mode.enable:開啟Delta Table CDC功能,預設為非同步方式,支援在SQL DML時同步產生CDC資料,不支援Tunnel即時寫入。cdc.insert.into.passthrough.enable:對於開啟CDC功能的表預設不支援INSERT INTO,添加該屬性時,可支援執行INSERT INTO語句,寫入的資料在CDC中僅代表INSERT類型,如果存在相同PK的資料行,後續查詢會因為PK衝突導致查詢失敗,需要使用者確保PK資料唯一。cdc.data.retain.hours:CDC資料的保留時間(1~168小時),預設24小時。
非同步CDC
建立Delta Table時添加"acid.cdc.mode.enable"="true"、"acid.cdc.build.async"="true"、"acid.cdc.build.interval"="300"屬性。
SQL樣本如下:
CREATE TABLE acid_with_cdc_build_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true",
"acid.cdc.mode.enable"="true",
"acid.cdc.build.async"="true",
"acid.cdc.build.interval"="300"); acid.cdc.mode.enable:開啟Delta Table CDC功能,預設為非同步方式,支援在SQL DML時同步產生CDC資料,不支援Tunnel即時寫入。acid.cdc.build.async:開啟非同步構建CDC的功能,支援Tunnel即時寫入該表,同時對於SQL的DML也會非同步產生。acid.cdc.build.interval:非同步構建的周期配置[60-3540],單位秒,結合業務或增量情境配置該參數。其他選擇性參數(Project級 / Session級):
"odps.storage.orc.enable.memcmp.sort.key="true",建議Project層級開啟,對於CDC的非同步構建以及查詢都會有效能協助。
CDC查詢
table_changes
文法
SELECT * FROM table_changes('<table_name>', <start> [, <end>]);參數說明
參數名稱
是否必填
參數說明
table_name是
指定查詢的Delta Table。
start是
BIGINT或者STRING類型,常量。指定對該表CDC資料查詢的起始version,table version可以通過
SHOW HISTORY FOR TABLE <table_name>;命令查詢,詳見SHOW。如果輸入為STRING類型,格式為yyyy-mm-dd hh:mi:ss。end否
BIGINT或者STRING類型,常量。指定對該表CDC資料查詢的截止version,若不填,則會查詢到最新version。如果輸入為STRING類型,格式為
yyyy-mm-dd hh:mi:ss。傳回值說明
除資料列外,會額外輸出如下三個系統列:
__meta_timestamp:代表資料寫入系統時間。__meta_op_type:包含1(INSERT)和0(DELETE)。__meta_is_update:包含1(TRUE)和0(FALSE)。
__meta_op_type和__meta_is_update可組合成如下四種情況:__meta_op_type
__meta_is_update
說明
1
0
代表INSERT的新記錄。
1
1
代表UPDATE後的值。
0
1
代表UPDATE前的值。
0
0
代表刪除。
樣本
建立
acid_cdc_table表 。CREATE TABLE acid_cdc_table(id1 STRING NOT NULL, id2 STRING NOT NULL, key1 BIGINT, key2 BIGINT, PRIMARY KEY(id1, id2)) tblproperties("transactional" = "true", "acid.cdc.mode.enable"="true");表中插入資料。
-- 插入資料時間2025-04-07 11:56:57 INSERT INTO acid_cdc_table VALUES ('1', '1006', 1006, 1006); -- 插入資料時間2025-04-07 12:15:00 INSERT INTO acid_cdc_table VALUES ('1', '1008', 1008, 1008); -- 插入資料時間2025-04-07 13:24:00 INSERT INTO acid_cdc_table VALUES ('1', '1032', 1032, 1032); -- 插入資料時間2025-04-07 14:00:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045); -- 插入資料時間2025-04-07 14:47:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);查詢
table version。SHOW HISTORY FOR TABLE acid_cdc_table;返回結果如下。
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000001 2025-04-07 11:55:59 CREATE TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000002 2025-04-07 11:56:57 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000003 2025-04-07 12:00:13 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000004 2025-04-07 12:15:32 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000005 2025-04-07 12:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000006 2025-04-07 13:24:47 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000007 2025-04-07 13:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000008 2025-04-07 14:00:41 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000009 2025-04-07 14:15:15 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000010 2025-04-07 14:47:46 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000011 2025-04-07 15:00:11 MINOR_COMPACT查詢CDC記錄。
查詢2025-04-07 12:00:00以來記錄。
SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00'); -- 等同於 SELECT * FROM table_changes('acid_cdc_table', 3);返回結果如下。
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1045 | 1045 | 1045 | 2025-04-07 14:00:34 | 1 | 0 | | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | | 2 | 1045 | 1045 | 1045 | 2025-04-07 14:47:41 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+查詢指定區間
2025-04-07 12:00:00至13:30:00的記錄。SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00', '2025-04-07 13:30:00'); -- 等同於 SELECT * FROM table_changes('acid_cdc_table', 3, 6);返回結果如下。
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+
Stream
指定對Delta Table CDC資料配合Stream使用,詳細使用方式請參見流對象(Stream),此處僅提供基本文法和樣本。
文法
CREATE STREAM [IF NOT EXISTS] <stream_name> ON TABLE <delta_table_name> <VERSION as of v> strmproperties ("read_mode"="cdc")說明"read_mode"指定對Delta Table的消費模式,"cdc"表示該Stream會根據查詢範圍查詢CDC資料。樣本
-- 建立源表acid_with_cdc_stream CREATE TABLE acid_with_cdc_stream (id1 BIGINT NOT NULL PRIMARY KEY, id2 BIGINT) tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true","cdc.insert.into.passthrough.enable"="true"); -- 插入資料 INSERT INTO acid_with_cdc_stream VALUES (1, 1006), (2, 1008), (3, 1032); -- 建立一個Stream關聯acid_with_cdc_stream表 CREATE STREAM delta_table_stream ON TABLE acid_with_cdc_stream version AS OF 1 strmproperties ("read_mode"="cdc"); -- 查詢Stream對象delta_table_stream DESC STREAM delta_table_stream;返回結果樣本如下:
Name delta_table_stream Project yunqi_y**** Schema default Create Time 2024-12-03 11:13:12 Last Modified Time 2024-12-03 11:13:12 Offset Version 1 Reference Table Project yunqi_y**** Reference Table Schema default Reference Table Name acid_with_cdc_stream Reference Table Id b89ec113f50944d5b8e52ce6a00c**** Reference Table Version 2 Parameters {"read_mode": "cdc"}