Change Data Capture (CDC) records row-level inserts, updates, and deletes on Delta tables as an incremental event stream. Downstream systems can consume this stream for incremental computing, data synchronization, stream processing, and auditing — without scanning the full table on each run.
CDC is currently in invitational preview. For usage instructions, see Incremental calculation overview.
Use cases
Incremental computing: Consume change records to update materialized views without reprocessing the full table.
Stream processing: Feed CDC records into Flink jobs or other stream processors.
Multi-engine data synchronization: Propagate incremental changes across different compute engines.
Log auditing: Retain a complete record of all row-level operations for compliance and governance.
Choose a CDC mode
CDC supports two modes. Choose based on whether your workload requires real-time writes via Tunnel.
| Synchronous CDC | Asynchronous CDC | |
|---|---|---|
| SQL DML operations | Supported | Supported |
| Real-time writes via Tunnel | Not supported | Supported |
| CDC data availability | Immediately after the SQL operation completes | Asynchronously, after the configured interval |
Enable CDC on a Delta table
Synchronous CDC
Add "acid.cdc.mode.enable"="true" when creating a Delta table.
CREATE TABLE acid_with_cdc_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");Optional properties:
| Property | Description | Default |
|---|---|---|
"cdc.insert.into.passthrough.enable"="true" | Allows INSERT INTO statements on CDC-enabled tables. Written data appears as INSERT type only. Primary keys must be unique — rows with duplicate PKs cause query failures. | Disabled |
"cdc.data.retain.hours"="24" | Retention period for CDC data, in hours. Range: 1–168. | 24 |
Asynchronous CDC
Add "acid.cdc.mode.enable"="true", "acid.cdc.build.async"="true", and "acid.cdc.build.interval" when creating a Delta table.
CREATE TABLE acid_with_cdc_build_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true",
"acid.cdc.mode.enable"="true",
"acid.cdc.build.async"="true",
"acid.cdc.build.interval"="300");Properties:
| Property | Required | Description | Range |
|---|---|---|---|
"acid.cdc.mode.enable"="true" | Yes | Enables CDC for the Delta table. | — |
"acid.cdc.build.async"="true" | Yes | Enables asynchronous CDC building. Supports real-time writes via Tunnel. CDC data for SQL DML operations is also generated asynchronously. | — |
"acid.cdc.build.interval"="300" | Yes | Interval for asynchronous CDC building, in seconds. | 60–3540 |
"odps.storage.orc.enable.memcmp.sort.key"="true" | No | Project-level or session-level property that improves asynchronous CDC build and query performance. | — |
Query CDC data
Use the table_changes function to query CDC change records by version or timestamp range.
Syntax
SELECT * FROM table_changes('<table_name>', <start> [, <end>]);Parameters
| Parameter | Required | Type | Description |
|---|---|---|---|
table_name | Yes | — | The Delta table to query. |
start | Yes | BIGINT or STRING | Starting version for the CDC data query. Find version numbers with SHOW HISTORY FOR TABLE <table_name>. For STRING, use yyyy-mm-dd hh:mi:ss. |
end | No | BIGINT or STRING | Ending version for the CDC data query. Defaults to the latest version if omitted. For STRING, use yyyy-mm-dd hh:mi:ss. |
Return columns
In addition to the table's data columns, table_changes returns three system columns:
| Column | Description |
|---|---|
__meta_timestamp | System time when the row was written. |
__meta_op_type | Operation type: 1 = INSERT, 0 = DELETE. |
__meta_is_update | Whether the row is part of an UPDATE: 1 = yes, 0 = no. |
Combine __meta_op_type and __meta_is_update to identify the full operation:
__meta_op_type | __meta_is_update | Operation |
|---|---|---|
1 | 0 | New row from an INSERT |
1 | 1 | Post-update value (the new value after UPDATE) |
0 | 1 | Pre-update value (the original value before UPDATE) |
0 | 0 | Deleted row |
Example
Step 1: Create a CDC-enabled table.
CREATE TABLE acid_cdc_table(id1 STRING NOT NULL, id2 STRING NOT NULL, key1 BIGINT, key2 BIGINT, PRIMARY KEY(id1, id2))
tblproperties("transactional" = "true", "acid.cdc.mode.enable"="true");Step 2: Insert data.
-- Data insertion time 2025-04-07 11:56:57
INSERT INTO acid_cdc_table VALUES ('1', '1006', 1006, 1006);
-- Data insertion time 2025-04-07 12:15:00
INSERT INTO acid_cdc_table VALUES ('1', '1008', 1008, 1008);
-- Data insertion time 2025-04-07 13:24:00
INSERT INTO acid_cdc_table VALUES ('1', '1032', 1032, 1032);
-- Data insertion time 2025-04-07 14:00:00
INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);
-- Data insertion time 2025-04-07 14:47:00
INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);Step 3: Find table versions.
SHOW HISTORY FOR TABLE acid_cdc_table;Output:
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000001 2025-04-07 11:55:59 CREATE
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000002 2025-04-07 11:56:57 APPEND
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000003 2025-04-07 12:00:13 MINOR_COMPACT
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000004 2025-04-07 12:15:32 APPEND
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000005 2025-04-07 12:30:02 MINOR_COMPACT
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000006 2025-04-07 13:24:47 APPEND
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000007 2025-04-07 13:30:02 MINOR_COMPACT
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000008 2025-04-07 14:00:41 APPEND
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000009 2025-04-07 14:15:15 MINOR_COMPACT
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000010 2025-04-07 14:47:46 APPEND
TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000011 2025-04-07 15:00:11 MINOR_COMPACTStep 4: Query CDC records.
Query all changes after 2025-04-07 12:00:00 (equivalent to version 3):
SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00');
-- Equivalent to
SELECT * FROM table_changes('acid_cdc_table', 3);Output:
+------------+------------+------------+------------+------------------+----------------+------------------+
| id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update |
+------------+------------+------------+------------+------------------+----------------+------------------+
| 1 | 1045 | 1045 | 1045 | 2025-04-07 14:00:34 | 1 | 0 |
| 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 |
| 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 |
| 2 | 1045 | 1045 | 1045 | 2025-04-07 14:47:41 | 1 | 0 |
+------------+------------+------------+------------+------------------+----------------+------------------+Query changes within a specific range (2025-04-07 12:00:00 to 13:30:00, equivalent to versions 3–6):
SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00', '2025-04-07 13:30:00');
-- Equivalent to
SELECT * FROM table_changes('acid_cdc_table', 3, 6);Output:
+------------+------------+------------+------------+------------------+----------------+------------------+
| id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update |
+------------+------------+------------+------------+------------------+----------------+------------------+
| 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 |
| 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 |
+------------+------------+------------+------------+------------------+----------------+------------------+Consume CDC data with a Stream
A Stream object tracks a read offset into a Delta table's CDC history. Create a Stream with "read_mode"="cdc" to consume CDC data incrementally.
For full Stream documentation, see Stream object.
Syntax
CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> VERSION AS OF <v>
strmproperties ("read_mode"="cdc")The "read_mode"="cdc" property sets the Stream to consume CDC data based on the query version range.
Example
-- Create the source table with CDC enabled.
CREATE TABLE acid_with_cdc_stream (id1 BIGINT NOT NULL PRIMARY KEY, id2 BIGINT)
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true","cdc.insert.into.passthrough.enable"="true");
-- Insert data.
INSERT INTO acid_with_cdc_stream VALUES (1, 1006), (2, 1008), (3, 1032);
-- Create a Stream starting at version 1.
CREATE STREAM delta_table_stream ON TABLE acid_with_cdc_stream VERSION AS OF 1 strmproperties ("read_mode"="cdc");
-- Inspect the Stream.
DESC STREAM delta_table_stream;Output:
Name delta_table_stream
Project yunqi_y****
Schema default
Create Time 2024-12-03 11:13:12
Last Modified Time 2024-12-03 11:13:12
Offset Version 1
Reference Table Project yunqi_y****
Reference Table Schema default
Reference Table Name acid_with_cdc_stream
Reference Table Id b89ec113f50944d5b8e52ce6a00c****
Reference Table Version 2
Parameters {"read_mode": "cdc"}