All Products
Search
Document Center

E-MapReduce:Build an incremental data warehouse by using Hudi CDC

Last Updated:Mar 26, 2026

Apache Hudi's Change Data Capture (CDC) feature streams row-level changes out of a Hudi table so downstream pipelines can consume only what changed, not the full dataset. Common use cases include:

  • Incremental ETL: Process only the rows that changed since the last pipeline run.

  • Audit trails: Track inserts, updates, and deletes for compliance and governance.

  • Data replication: Synchronize changes to downstream tables, caches, or external systems in near real time.

Supported EMR versions

Hudi CDC requires an E-MapReduce (EMR) cluster running V3.45.0 or a later minor version, or V5.11.0 or a later minor version. These versions use Apache Hudi 0.12.2.

Parameters

Write parameters

Parameter Description
hoodie.table.cdc.enabled Enables CDC. Set to true to enable. Default: false.
hoodie.table.cdc.supplemental.logging.mode Controls what CDC log files store. See the values below. Default: data_before_after.

hoodie.table.cdc.supplemental.logging.mode accepts the following values:

Value What is stored
op_key_only Primary key and operation type
data_before Primary key, operation type, and the record before the change
data_before_after Primary key, operation type, and the record before and after the change (default)

Read parameters

To perform a CDC query, set both hoodie.datasource.query.type and hoodie.datasource.query.incremental.format as shown below.

Parameter Required value for CDC Default
hoodie.datasource.query.type incremental snapshot
hoodie.datasource.query.incremental.format cdc latest_state
hoodie.datasource.read.begin.instanttime Start instant time of the query range
hoodie.datasource.read.end.instanttime End instant time of the query range (optional)

Time range semantics: The query range is a left-open, right-closed interval — (begin, end]. To include a specific commit, subtract 1 from its timestamp and use the result as begin.instanttime.

CDC output columns

A CDC query returns the following columns in addition to your table's data columns:

Column Type Description
op String Operation type: i (insert), u (update)
ts_ms String The commit instant time associated with the change
before JSON The record's state before the change. null for inserts.
after JSON The record's state after the change.

Examples

Spark SQL

Prerequisites

Before you begin, ensure that you have:

  • An EMR cluster running V3.45.0 or a later minor version, or V5.11.0 or a later minor version

  • The spark.serializer configuration item set to org.apache.spark.serializer.KryoSerializer

To add the spark.serializer configuration item: on the Configure tab of the Spark service page, click spark-defaults.conf, then click Add Configuration item and set the value to org.apache.spark.serializer.KryoSerializer. For details, see the Add configuration items section of "Manage configuration items".

Create a table with CDC enabled

create table hudi_cdc_test (
  id bigint,
  name string,
  ts bigint
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts',
  'hoodie.table.cdc.enabled' = 'true',
  'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after'
);

Write and query data

  1. Insert two rows and verify the table contents.

    insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001);
    select * from hudi_cdc_test;

    Expected output:

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    1    a1    1000
    20230129220605215    20230129220605215_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    2    a2    1001
  2. Get the commit timestamp from the .hoodie directory.

    -rw-r--r--  1 zxy  staff   1.2K  1 29 22:06 20230129220605215.commit
    -rw-r--r--  1 zxy  staff     0B  1 29 22:06 20230129220605215.commit.requested
    -rw-r--r--  1 zxy  staff   798B  1 29 22:06 20230129220605215.inflight

    The commit timestamp is 20230129220605215. Because the query range is left-open, subtract 1 to use 20230129220605214 as the start instant time.

  3. Run a CDC query to see the inserted rows.

    select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");

    Expected output (two insert records, op = i, before = NULL):

    i    20230129220605215    NULL    {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1}
    i    20230129220605215    NULL    {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2}
  4. Update a row and verify the table contents.

    insert into hudi_cdc_test values (2, 'a2', 1002);
    select * from hudi_cdc_test;

    Expected output (row 2 is updated to ts = 1002):

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    1    a1    1000
    20230129221304930    20230129221304930_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    2    a2    1002
  5. Get the new commit timestamp (20230129221304930) and subtract 1. Run another CDC query to see only the update.

    select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");

    Expected output (one update record, op = u, before contains the old state):

    u    20230129221304930    {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}

DataFrame (Scala)

All DataFrame examples use hoodie.datasource.query.type = incremental and hoodie.datasource.query.incremental.format = cdc to enable CDC reads. The write side uses op_key_only mode, which stores only the primary key and operation type.

Prerequisites

Before you begin, ensure that you have:

  • An EMR cluster running V3.45.0 or a later minor version, or V5.11.0 or a later minor version

  • Apache Spark configured with KryoSerializer

Set up the session and options

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase}

val spark: SparkSession = SparkSession.builder()
  .master("local[4]")
  .withExtensions(new HoodieSparkSessionExtension)
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()
import spark.implicits._

val basePath = "/tmp/test/hudi_cdc_test"

val writeOpts = Map(
  "hoodie.table.name" -> "hudi_cdc_test",
  "hoodie.datasource.write.recordkey.field" -> "id",
  "hoodie.datasource.write.precombine.field" -> "ts",
  "hoodie.table.cdc.enabled" -> "true",
  "hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only"
)

val readOpts = Map(
  "hoodie.datasource.query.type" -> "incremental",
  "hoodie.datasource.query.incremental.format" -> "cdc"
)

Write the first batch and read CDC changes

  1. Write two rows.

    val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts")
    df1.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df1.show(false)

    Expected output:

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |1  |a1  |1000|
    |2  |a2  |1001|
    +---+----+----+
  2. Read CDC changes from cdc1. The start instant time is calculated by subtracting 1 from the last commit timestamp.

    val metaClient = HoodieTableMetaClient.builder()
      .setBasePath(basePath)
      .setConf(spark.sessionState.newHadoopConf())
      .build()
    
    val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc1 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString)
      .load(basePath)
    cdc1.show(false)

    Expected output (two insert records, op = i, before = null):

    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |op |ts_ms            |before|after                                                                                                                                                                                                                                                                      |
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |i  |20230128030951890|null  |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}|
    |i  |20230128030951890|null  |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}|
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Write a second batch and read CDC changes

  1. Update row 2 by writing a new version with ts = 1002.

    val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts")
    df2.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df2.show(false)

    Expected output:

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |2  |a2  |1002|
    +---+----+----+
  2. Read CDC changes from cdc2. The before column contains the previous state of row 2.

    val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc2 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString)
      .load(basePath)
    cdc2.show(false)

    Expected output (one update record, op = u, before shows the old state):

    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |op |ts_ms            |before                                                                                                                                                                                                                                                                                    |after                                                                                                                                                                                                                                                                      |
    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |u  |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}|
    +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+