All Products
Search
Document Center

E-MapReduce:Build an incremental data warehouse by using Delta Lake CDC

Last Updated:May 11, 2024

You can build an incremental data warehouse by using Delta Lake Change Data Capture (CDC). This topic describes the parameters, schemas, and usage examples of Delta Lake CDC.

Background information

CDC is a set of software design patterns that you can use to identify and capture data changes in a table of a database and deliver the changes to a downstream process or system. Delta Lake CDC can use a Delta Lake table as a source to obtain the data changes.

Delta Lake CDC is implemented by using Change Data Feed (CDF). CDF allows you to trace data changes at the row level in a Delta Lake table. After you enable CDF for a table, Delta Lake can persist the change data and write the data to a specific file in the directory that stores the table. You can use Delta Lake CDC to build incremental data warehouses in an efficient manner.

Limits

Delta Lake CDC is available only for clusters of EMR V3.41.0 or a later minor version (Delta Lake 0.6.1) and EMR V5.9.0 or a later minor version (Delta Lake 2.0).

Parameters

SparkConf parameters

Parameter

Description

spark.sql.externalTableValuedFunctions

A custom EMR Spark parameter that extends the table-valued function of Spark 2.4.X. If you use Spark SQL to perform a CDF query, set this parameter to table_changes.

spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled

Valid values:

  • false: The start timestamp or end timestamp of the CDF query must be valid. Otherwise, an error is returned. This is the default value.

  • true: If the start timestamp is invalid, empty data is returned. If the end timestamp is invalid, data of the current snapshot is returned.

Note

This parameter takes effect only in Delta Lake 2.X.

CDC write parameters

Parameter

Description

delta.enableChangeDataFeed

Specifies whether to enable CDF. Valid values:

  • false: disables CDF. This is the default value.

  • true: enables CDF.

CDC read parameters

The parameters that are described in the following table are required only when you build Delta Lake CDC by using DataFrame or Spark Streaming.

Parameter

Description

readChangeFeed

Specifies whether to read the change data of a table. The value true indicates that the change data of a table is returned. If you set this parameter to true, you must configure the startingVersion or startingTimestamp parameter.

startingVersion

Specifies the version of a table from which the change data starts to be read. This parameter takes effect only when you set the readChangeFeed parameter to true.

endingVersion

Specifies the version of a table from which the change data stops being read. This parameter takes effect only when you set the readChangeFeed parameter to true.

startingTimestamp

Specifies the timestamp at which the change data of a table starts to be read. This parameter takes effect only when you set the readChangeFeed parameter to true.

endingTimestamp

Specifies the timestamp at which the change data of a table stops being read. This parameter takes effect only when you set the readChangeFeed parameter to true.

Schema

The schema that is returned by Delta Lake CDF includes the schema of the original table and the following fields:

  • _change_type: the operation that causes a data change. Valid values:

    • insert: inserts data into a table.

    • delete: deletes data from a table.

    • update_preimage and update_postimage: update data in a table. update_preimage records the data before a change and update_postimage records the data after the change.

  • _commit_version: specifies the version of the Delta Lake table in which a data change occurs.

  • _commit_timestamp: specifies the time when the Delta Lake table in which a data change occurs is committed.

Examples

Build Delta Lake CDC by using Spark SQL

Important

The Spark SQL syntax is supported only in EMR Spark 2 and Delta Lake 0.6.1.

To use the Spark SQL syntax in EMR Spark 2, you must configure the following parameter. Sample code:

spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes

The following sample code provides an example on how to use the SQL syntax:

-- Create Delta CDF-enabled Table
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");

-- Insert Into
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);

-- Insert Overwrite
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);

-- Update
UPDATE cdf_tbl set age = age + 1;

-- Merge Into
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);

MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;

-- Delete
DELETE FROM cdf_tbl WHERE age >= 32;

-- CDF Query
-- Query the change data from the table of all versions. The version of the table from which data starts to be queried is 0. 
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); -- 2023-02-03 15:33:34 is the time when the table of version 0 is committed. 


-- Query the change data from the table of version 4. 
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); -- 2023-02-03 15:34:06 is the time when the table of version 4 is committed. 

The following figures show the query results.

Figure 1. Result for the first queryfig1

Figure 2. Result for the second queryfig2

Build Delta Lake CDC by using DataFrame

// Create and Write to Delta CDF-enabled Table
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
  .option("delta.enableChangeDataFeed", "true") // The first time you write data to the Delta Lake table, you need to enable CDF. In subsequent data write operations, you no longer need to enable CDF. 
  .saveAsTable("cdf_table")

// CDF Query Using DataFrame
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) // The endingVersion parameter is optional. 
  .table("cdf_table")

Build Delta Lake CDC by using Spark Streaming

// Streaming CDF Query Using Dats
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) // The endingVersion parameter is optional. 
  .table("cdf_table")