All Products
Search
Document Center

E-MapReduce:Build an incremental data warehouse with Delta Lake CDC

Last Updated:Mar 26, 2026

Delta Lake Change Data Capture (CDC) tracks row-level changes in a Delta Lake table and delivers those changes to downstream processes or systems. This topic describes the parameters, schema, and usage examples for Delta Lake CDC.

How it works

CDC is a set of software design patterns for identifying and capturing data changes in a database table. Delta Lake CDC uses a Delta Lake table as the source and is implemented through Change Data Feed (CDF).

CDF traces data changes at the row level. When enabled, Delta Lake persists change data to a dedicated directory within the table's storage path. You can then read that change data in batch or streaming mode to power incremental data warehouse pipelines.

Use cases

Enable CDF when you need to:

  • Process only what changed: In ETL pipelines, read only the rows that changed since the last run instead of scanning the entire table.

  • Audit data modifications: Capture a full history of inserts, updates, and deletes for compliance and governance.

  • Replicate changes downstream: Propagate changes to downstream tables, caches, or external systems such as Kafka or a relational database.

Version requirements

Delta Lake CDC requires:

  • EMR V3.41.0 or later (Delta Lake 0.6.1)

  • EMR V5.9.0 or later (Delta Lake 2.0)

Parameters

SparkConf parameters

Parameter Description
spark.sql.externalTableValuedFunctions A custom EMR Spark parameter that extends the table-valued function of Spark 2.4.X. Set to table_changes to run CDF queries with Spark SQL.
spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled Controls behavior when a CDF query timestamp is out of range. false (default): returns an error if the start or end timestamp is invalid. true: returns empty data for an invalid start timestamp; returns the current snapshot for an invalid end timestamp. Effective only in Delta Lake 2.X.

CDC write parameters

Parameter Description
delta.enableChangeDataFeed Enables CDF for a table. false (default): CDF is disabled. true: CDF is enabled.

CDC read parameters

The following parameters apply when reading CDF data with DataFrame or Spark Streaming.

Parameter Description
readChangeFeed Set to true to read change data. Requires startingVersion or startingTimestamp.
startingVersion The table version at which to start reading change data. Requires readChangeFeed=true.
endingVersion The table version at which to stop reading change data. Requires readChangeFeed=true.
startingTimestamp The timestamp at which to start reading change data. Requires readChangeFeed=true.
endingTimestamp The timestamp at which to stop reading change data. Requires readChangeFeed=true.

Schema

The CDF result set includes all columns from the original table plus the following metadata columns:

Column Description
_change_type The operation that caused the change: insert, delete, update_preimage (row value before update), or update_postimage (row value after update).
_commit_version The Delta Lake table version in which the change was committed.
_commit_timestamp The timestamp when the change was committed.

Enable CDF

CDF is disabled by default. Set the delta.enableChangeDataFeed property when creating the table:

CREATE TABLE cdf_tbl (id INT, name STRING, age INT)
USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");

Read change data

Query with Spark SQL

Important

Spark SQL CDF syntax is supported only in EMR Spark 2 with Delta Lake 0.6.1. Start the Spark SQL session with the following configuration:

spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes

Use the table_changes function to query change data. The function accepts a table name and a starting version or timestamp, with an optional ending version or timestamp.

Query by version:

-- All changes from version 0 onwards
SELECT * FROM table_changes("cdf_tbl", 0);

-- Changes in version 4 only
SELECT * FROM table_changes("cdf_tbl", 4, 4);

Query by timestamp:

-- All changes from 2023-02-03 15:33:34 (the commit time of version 0) onwards
SELECT * FROM table_changes("cdf_tbl", '2023-02-03 15:33:34');

-- Changes committed at 2023-02-03 15:34:06 (version 4) only
SELECT * FROM table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06');

Query with DataFrame (batch)

// Query by version
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) // Optional: omit to read to the latest version
  .table("cdf_table")

// Query by timestamp
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingTimestamp", "2023-02-03 15:33:34")
  .option("endingTimestamp", "2023-02-03 15:34:06") // Optional
  .table("cdf_table")

Query with Spark Streaming

// Streaming CDF Query
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) // The endingVersion parameter is optional.
  .table("cdf_table")

End-to-end example: Spark SQL

The following example demonstrates a complete CDF workflow using Spark SQL: create a CDF-enabled table, perform write operations, and query the resulting change data.

-- Create a CDF-enabled table
CREATE TABLE cdf_tbl (id INT, name STRING, age INT)
USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");

-- Insert rows (version 0 -> 1)
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);

-- Overwrite all rows (version 2)
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, 'a3', 32);

-- Update all rows (version 3)
UPDATE cdf_tbl SET age = age + 1;

-- Merge from a source table (version 4)
CREATE TABLE merge_source (id INT, name STRING, age INT) USING delta;
INSERT INTO merge_source VALUES (1, 'a1', 31), (2, 'a2_new', 33), (4, 'a4', 30);

MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;

-- Delete rows (version 5)
DELETE FROM cdf_tbl WHERE age >= 32;

-- Query all changes from version 0
SELECT * FROM table_changes("cdf_tbl", 0);

Figure 1. Result for the first queryfig1

-- Query changes in version 4 only
SELECT * FROM table_changes("cdf_tbl", 4, 4);

Figure 2. Result for the second queryfig2

End-to-end example: DataFrame

The following example demonstrates a complete CDF workflow using the DataFrame API.

// Create a CDF-enabled table and write initial data
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta")
  .mode("append")
  .option("delta.enableChangeDataFeed", "true") // Enable CDF on first write; not required for subsequent writes
  .saveAsTable("cdf_table")

// Read change data from version 0 to version 4
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) // Optional
  .table("cdf_table")