You can build an incremental data warehouse by using Delta Lake Change Data Capture (CDC). This topic describes the parameters, schemas, and usage examples of Delta Lake CDC.
Background information
CDC is a set of software design patterns that you can use to identify and capture data changes in a table of a database and deliver the changes to a downstream process or system. Delta Lake CDC can use a Delta Lake table as a source to obtain the data changes.
Delta Lake CDC is implemented by using Change Data Feed (CDF). CDF allows you to trace data changes at the row level in a Delta Lake table. After you enable CDF for a table, Delta Lake can persist the change data and write the data to a specific file in the directory that stores the table. You can use Delta Lake CDC to build incremental data warehouses in an efficient manner.
Limits
Delta Lake CDC is available only for clusters of EMR V3.41.0 or a later minor version (Delta Lake 0.6.1) and EMR V5.9.0 or a later minor version (Delta Lake 2.0).
Parameters
SparkConf parameters
Parameter | Description |
spark.sql.externalTableValuedFunctions | A custom EMR Spark parameter that extends the table-valued function of Spark 2.4.X. If you use Spark SQL to perform a CDF query, set this parameter to table_changes. |
spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled | Valid values:
Note This parameter takes effect only in Delta Lake 2.X. |
CDC write parameters
Parameter | Description |
delta.enableChangeDataFeed | Specifies whether to enable CDF. Valid values:
|
CDC read parameters
The parameters that are described in the following table are required only when you build Delta Lake CDC by using DataFrame or Spark Streaming.
Parameter | Description |
readChangeFeed | Specifies whether to read the change data of a table. The value true indicates that the change data of a table is returned. If you set this parameter to true, you must configure the startingVersion or startingTimestamp parameter. |
startingVersion | Specifies the version of a table from which the change data starts to be read. This parameter takes effect only when you set the readChangeFeed parameter to true. |
endingVersion | Specifies the version of a table from which the change data stops being read. This parameter takes effect only when you set the readChangeFeed parameter to true. |
startingTimestamp | Specifies the timestamp at which the change data of a table starts to be read. This parameter takes effect only when you set the readChangeFeed parameter to true. |
endingTimestamp | Specifies the timestamp at which the change data of a table stops being read. This parameter takes effect only when you set the readChangeFeed parameter to true. |
Schema
The schema that is returned by Delta Lake CDF includes the schema of the original table and the following fields:
_change_type: the operation that causes a data change. Valid values:
insert: inserts data into a table.
delete: deletes data from a table.
update_preimage and update_postimage: update data in a table. update_preimage records the data before a change and update_postimage records the data after the change.
_commit_version: specifies the version of the Delta Lake table in which a data change occurs.
_commit_timestamp: specifies the time when the Delta Lake table in which a data change occurs is committed.
Examples
Build Delta Lake CDC by using Spark SQL
The Spark SQL syntax is supported only in EMR Spark 2 and Delta Lake 0.6.1.
To use the Spark SQL syntax in EMR Spark 2, you must configure the following parameter. Sample code:
spark-sql --conf spark.sql.externalTableValuedFunctions=table_changesThe following sample code provides an example on how to use the SQL syntax:
-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");
-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);
-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);
-- Update
UPDATE cdf_tbl set age = age + 1;
-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);
MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;
-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;
-- CDF Query
-- Query the change data from the table of all versions. The version of the table from which data starts to be queried is 0.
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); -- 2023-02-03 15:33:34 is the time when the table of version 0 is committed.
-- Query the change data from the table of version 4.
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); -- 2023-02-03 15:34:06 is the time when the table of version 4 is committed. The following figures show the query results.
Figure 1. Result for the first query
Figure 2. Result for the second query
Build Delta Lake CDC by using DataFrame
// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
.option("delta.enableChangeDataFeed", "true") // The first time you write data to the Delta Lake table, you need to enable CDF. In subsequent data write operations, you no longer need to enable CDF.
.saveAsTable("cdf_table")
// CDF Query Using DataFrame
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) // The endingVersion parameter is optional.
.table("cdf_table")Build Delta Lake CDC by using Spark Streaming
// Streaming CDF Query Using Dats
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) // The endingVersion parameter is optional.
.table("cdf_table")