All Products
Search
Document Center

E-MapReduce:Build an incremental data warehouse by using Hudi CDC

Last Updated:Jul 17, 2024

You can build an incremental data warehouse by using Hudi Change Data Capture (CDC). This topic describes the parameters and usage examples of Hudi CDC.

Limits

CDC is a set of software design patterns that you can use to identify and capture data changes in a table of a database and deliver the changes to a downstream process or system. Hudi CDC can use a Hudi table as a source to obtain the data changes.

Limits

Hudi CDC is available only for E-MapReduce (EMR) clusters of V3.45.0 or a later minor version and clusters of V5.11.0 or a later minor version. Hudi 0.12.2 is used in these EMR versions.

Parameters

Hudi CDC write parameters

Parameter

Description

hoodie.table.cdc.enabled

Specifies whether to enable the CDC feature. Valid values:

  • true: enables the CDC feature.

  • false: disables the CDC feature. This is the default value.

hoodie.table.cdc.supplemental.logging.mode

The storage mode of CDC files. Valid values:

  • op_key_only: stores the primary key and operation type.

  • data_before: stores the primary key, operation type, and data before a change.

  • data_before_after: stores the primary key, operation type, data before a change, and data after a change. This is the default value.

Hudi CDC read parameters

Parameter

Description

hoodie.datasource.query.type

The type of query. To use the CDC feature, you must set this parameter to incremental.

Default value: snapshot.

hoodie.datasource.query.incremental.format

The type of the incremental query. To use the CDC feature, you must set this parameter to cdc.

Default value: latest_state.

hoodie.datasource.read.begin.instanttime

The beginning time of the incremental query.

hoodie.datasource.read.end.instanttime

The end time of the incremental query. This parameter is optional.

Examples

Spark SQL

  1. On the Configure tab of the Spark service page, click spark-defaults.conf. Then, click Add Configuration item to add the spark.serializer configuration item. Set the value to org.apache.spark.serializer.KryoSerializer. For information about how to add a configuration item, see the Add configuration items section of the "Manage configuration items" topic.

  2. Run the following command to create a table:

    create table hudi_cdc_test (
      id bigint,
      name string,
      ts bigint
    ) using hudi
    tblproperties (
      type = 'cow',
      primaryKey = 'id',
      preCombineField = 'ts',
      'hoodie.table.cdc.enabled' = 'true',
      'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after'
    );
  3. Run the following commands to write data to the table and query the table data:

    insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001);
    select * from hudi_cdc_test;

    The following output is returned:

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    1    a1    1000
    20230129220605215    20230129220605215_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet    2    a2    1001
  4. Obtain the timestamp when data is last committed from the .hoodie directory and perform a CDC query.

    1. Obtain the timestamp when data is last committed.

      -rw-r--r--  1 zxy  staff   1.2K  1 29 22:06 20230129220605215.commit
      -rw-r--r--  1 zxy  staff     0B  1 29 22:06 20230129220605215.commit.requested
      -rw-r--r--  1 zxy  staff   798B  1 29 22:06 20230129220605215.inflight
    2. Run the following command to perform a CDC query.

      The time range is a left-open, right-closed interval. The start timestamp is calculated by using the following formula: Original timestamp - 1.

      select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");

      The following output is returned:

      i    20230129220605215    NULL    {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1}
      i    20230129220605215    NULL    {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2}
  5. Run the following commands to write data to the table and query the table data again:

    insert into hudi_cdc_test values (2, 'a2', 1002);
    select * from hudi_cdc_test;

    The following output is returned:

    20230129220605215    20230129220605215_0_0    1        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    1    a1    1000
    20230129221304930    20230129221304930_0_1    2        0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet    2    a2    1002
  6. Obtain the timestamp when data is last committed by referring to step 3 and subtract 1 from the obtained timestamp. Then, perform a CDC query.

    For example, the obtained timestamp is 20230129221304930. The start timestamp is 20230129221304929. Run the following command to perform a CDC query.

    select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");

    The following output is returned:

    u    20230129221304930    {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}

Dataframe

  1. Make preparations.

    import org.apache.hudi.common.table.HoodieTableMetaClient
    import org.apache.spark.sql.{SaveMode, SparkSession}
    import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase}
    
    val spark: SparkSession = SparkSession.builder()
      .master("local[4]")
      .withExtensions(new HoodieSparkSessionExtension)
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()
    import spark.implicits._
    
    val basePath = "/tmp/test/hudi_cdc_test"
    
    val writeOpts = Map(
      "hoodie.table.name" -> "hudi_cdc_test",
      "hoodie.datasource.write.recordkey.field" -> "id",
      "hoodie.datasource.write.precombine.field" -> "ts",
      "hoodie.table.cdc.enabled" -> "true",
      "hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only"
    )
    
    val readOpts = Map(
      "hoodie.datasource.query.type" -> "incremental",
      "hoodie.datasource.query.incremental.format" -> "cdc"
    )
  2. Use df1 to write data.

    val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts")
    df1.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df1.show(false)

    The following output is returned:

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |1  |a1  |1000|
    |2  |a2  |1001|
    +---+----+----+
  3. Read data from cdc1.

    val metaClient = HoodieTableMetaClient.builder()
      .setBasePath(basePath)
      .setConf(spark.sessionState.newHadoopConf())
      .build()
    
    val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc1 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString)
      .load(basePath)
    cdc1.show(false)

    The following output is returned:

    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |op |ts_ms            |before|after                                                                                                                                                                                                                                                                      |
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    |i  |20230128030951890|null  |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}|
    |i  |20230128030951890|null  |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}|
    +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  4. Use df2 to write data.

    val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts")
    df2.write.format("hudi")
      .options(writeOpts)
      .mode(SaveMode.Append)
      .save(basePath)
    df2.show(false)

    The following output is returned:

    +---+----+----+
    |id |name|ts  |
    +---+----+----+
    |2  |a2  |1002|
    +---+----+----+
  5. Read data from cdc2.

    val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
    val cdc2 = spark.read.format("hudi")
      .options(readOpts)
      .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString)
      .load(basePath)
    cdc2.show(false)

    The following output is returned:


    |op |ts_ms            |before                                                                                                                                                                                                                                                                                    |after                                                                                                                                                                                                                                                                      |

    |u  |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}|
