Apache Hudi's Change Data Capture (CDC) feature streams row-level changes out of a Hudi table so downstream pipelines can consume only what changed, not the full dataset. Common use cases include:
-
Incremental ETL: Process only the rows that changed since the last pipeline run.
-
Audit trails: Track inserts, updates, and deletes for compliance and governance.
-
Data replication: Synchronize changes to downstream tables, caches, or external systems in near real time.
Supported EMR versions
Hudi CDC requires an E-MapReduce (EMR) cluster running V3.45.0 or a later minor version, or V5.11.0 or a later minor version. These versions use Apache Hudi 0.12.2.
Parameters
Write parameters
| Parameter | Description |
|---|---|
hoodie.table.cdc.enabled |
Enables CDC. Set to true to enable. Default: false. |
hoodie.table.cdc.supplemental.logging.mode |
Controls what CDC log files store. See the values below. Default: data_before_after. |
hoodie.table.cdc.supplemental.logging.mode accepts the following values:
| Value | What is stored |
|---|---|
op_key_only |
Primary key and operation type |
data_before |
Primary key, operation type, and the record before the change |
data_before_after |
Primary key, operation type, and the record before and after the change (default) |
Read parameters
To perform a CDC query, set both hoodie.datasource.query.type and hoodie.datasource.query.incremental.format as shown below.
| Parameter | Required value for CDC | Default |
|---|---|---|
hoodie.datasource.query.type |
incremental |
snapshot |
hoodie.datasource.query.incremental.format |
cdc |
latest_state |
hoodie.datasource.read.begin.instanttime |
Start instant time of the query range | — |
hoodie.datasource.read.end.instanttime |
End instant time of the query range (optional) | — |
Time range semantics: The query range is a left-open, right-closed interval — (begin, end]. To include a specific commit, subtract 1 from its timestamp and use the result as begin.instanttime.
CDC output columns
A CDC query returns the following columns in addition to your table's data columns:
| Column | Type | Description |
|---|---|---|
op |
String | Operation type: i (insert), u (update) |
ts_ms |
String | The commit instant time associated with the change |
before |
JSON | The record's state before the change. null for inserts. |
after |
JSON | The record's state after the change. |
Examples
Spark SQL
Prerequisites
Before you begin, ensure that you have:
-
An EMR cluster running V3.45.0 or a later minor version, or V5.11.0 or a later minor version
-
The
spark.serializerconfiguration item set toorg.apache.spark.serializer.KryoSerializer
To add the spark.serializer configuration item: on the Configure tab of the Spark service page, click spark-defaults.conf, then click Add Configuration item and set the value to org.apache.spark.serializer.KryoSerializer. For details, see the Add configuration items section of "Manage configuration items".
Create a table with CDC enabled
create table hudi_cdc_test (
id bigint,
name string,
ts bigint
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts',
'hoodie.table.cdc.enabled' = 'true',
'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after'
);
Write and query data
-
Insert two rows and verify the table contents.
insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001); select * from hudi_cdc_test;Expected output:
20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 1 a1 1000 20230129220605215 20230129220605215_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 2 a2 1001 -
Get the commit timestamp from the
.hoodiedirectory.-rw-r--r-- 1 zxy staff 1.2K 1 29 22:06 20230129220605215.commit -rw-r--r-- 1 zxy staff 0B 1 29 22:06 20230129220605215.commit.requested -rw-r--r-- 1 zxy staff 798B 1 29 22:06 20230129220605215.inflightThe commit timestamp is
20230129220605215. Because the query range is left-open, subtract 1 to use20230129220605214as the start instant time. -
Run a CDC query to see the inserted rows.
select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");Expected output (two insert records,
op = i,before = NULL):i 20230129220605215 NULL {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1} i 20230129220605215 NULL {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2} -
Update a row and verify the table contents.
insert into hudi_cdc_test values (2, 'a2', 1002); select * from hudi_cdc_test;Expected output (row 2 is updated to
ts = 1002):20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 1 a1 1000 20230129221304930 20230129221304930_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 2 a2 1002 -
Get the new commit timestamp (
20230129221304930) and subtract 1. Run another CDC query to see only the update.select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");Expected output (one update record,
op = u,beforecontains the old state):u 20230129221304930 {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}
DataFrame (Scala)
All DataFrame examples use hoodie.datasource.query.type = incremental and hoodie.datasource.query.incremental.format = cdc to enable CDC reads. The write side uses op_key_only mode, which stores only the primary key and operation type.
Prerequisites
Before you begin, ensure that you have:
-
An EMR cluster running V3.45.0 or a later minor version, or V5.11.0 or a later minor version
-
Apache Spark configured with
KryoSerializer
Set up the session and options
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase}
val spark: SparkSession = SparkSession.builder()
.master("local[4]")
.withExtensions(new HoodieSparkSessionExtension)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
val basePath = "/tmp/test/hudi_cdc_test"
val writeOpts = Map(
"hoodie.table.name" -> "hudi_cdc_test",
"hoodie.datasource.write.recordkey.field" -> "id",
"hoodie.datasource.write.precombine.field" -> "ts",
"hoodie.table.cdc.enabled" -> "true",
"hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only"
)
val readOpts = Map(
"hoodie.datasource.query.type" -> "incremental",
"hoodie.datasource.query.incremental.format" -> "cdc"
)
Write the first batch and read CDC changes
-
Write two rows.
val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts") df1.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df1.show(false)Expected output:
+---+----+----+ |id |name|ts | +---+----+----+ |1 |a1 |1000| |2 |a2 |1001| +---+----+----+ -
Read CDC changes from
cdc1. The start instant time is calculated by subtracting 1 from the last commit timestamp.val metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath) .setConf(spark.sessionState.newHadoopConf()) .build() val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc1 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString) .load(basePath) cdc1.show(false)Expected output (two insert records,
op = i,before = null):+---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before|after | +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |i |20230128030951890|null |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}| |i |20230128030951890|null |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}| +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Write a second batch and read CDC changes
-
Update row 2 by writing a new version with
ts = 1002.val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts") df2.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df2.show(false)Expected output:
+---+----+----+ |id |name|ts | +---+----+----+ |2 |a2 |1002| +---+----+----+ -
Read CDC changes from
cdc2. Thebeforecolumn contains the previous state of row 2.val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc2 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString) .load(basePath) cdc2.show(false)Expected output (one update record,
op = u,beforeshows the old state):+---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before |after | +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |u |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}| +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+