You can build an incremental data warehouse by using Hudi Change Data Capture (CDC). This topic describes the parameters and usage examples of Hudi CDC.
Limits
CDC is a set of software design patterns that you can use to identify and capture data changes in a table of a database and deliver the changes to a downstream process or system. Hudi CDC can use a Hudi table as a source to obtain the data changes.
Limits
Hudi CDC is available only for E-MapReduce (EMR) clusters of V3.45.0 or a later minor version and clusters of V5.11.0 or a later minor version. Hudi 0.12.2 is used in these EMR versions.
Parameters
Hudi CDC write parameters
Parameter | Description |
hoodie.table.cdc.enabled | Specifies whether to enable the CDC feature. Valid values:
|
hoodie.table.cdc.supplemental.logging.mode | The storage mode of CDC files. Valid values:
|
Hudi CDC read parameters
Parameter | Description |
hoodie.datasource.query.type | The type of query. To use the CDC feature, you must set this parameter to Default value: snapshot. |
hoodie.datasource.query.incremental.format | The type of the incremental query. To use the CDC feature, you must set this parameter to Default value: latest_state. |
hoodie.datasource.read.begin.instanttime | The beginning time of the incremental query. |
hoodie.datasource.read.end.instanttime | The end time of the incremental query. This parameter is optional. |
Examples
Spark SQL
On the Configure tab of the Spark service page, click spark-defaults.conf. Then, click Add Configuration item to add the spark.serializer configuration item. Set the value to org.apache.spark.serializer.KryoSerializer. For information about how to add a configuration item, see the Add configuration items section of the "Manage configuration items" topic.
Run the following command to create a table:
create table hudi_cdc_test ( id bigint, name string, ts bigint ) using hudi tblproperties ( type = 'cow', primaryKey = 'id', preCombineField = 'ts', 'hoodie.table.cdc.enabled' = 'true', 'hoodie.table.cdc.supplemental.logging.mode' = 'data_before_after' );Run the following commands to write data to the table and query the table data:
insert into hudi_cdc_test values (1, 'a1', 1000), (2, 'a2', 1001); select * from hudi_cdc_test;The following output is returned:
20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 1 a1 1000 20230129220605215 20230129220605215_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet 2 a2 1001Obtain the timestamp when data is last committed from the .hoodie directory and perform a CDC query.
Obtain the timestamp when data is last committed.
-rw-r--r-- 1 zxy staff 1.2K 1 29 22:06 20230129220605215.commit -rw-r--r-- 1 zxy staff 0B 1 29 22:06 20230129220605215.commit.requested -rw-r--r-- 1 zxy staff 798B 1 29 22:06 20230129220605215.inflightRun the following command to perform a CDC query.
The time range is a left-open, right-closed interval. The start timestamp is calculated by using the following formula: Original timestamp - 1.
select * from hudi_table_changes("hudi_cdc_test", "20230129220605214");The following output is returned:
i 20230129220605215 NULL {"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_0","name":"a1","_hoodie_commit_time":"20230129220605215","ts":1000,"id":1} i 20230129220605215 NULL {"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet","_hoodie_commit_seqno":"20230129220605215_0_1","name":"a2","_hoodie_commit_time":"20230129220605215","ts":1001,"id":2}
Run the following commands to write data to the table and query the table data again:
insert into hudi_cdc_test values (2, 'a2', 1002); select * from hudi_cdc_test;The following output is returned:
20230129220605215 20230129220605215_0_0 1 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 1 a1 1000 20230129221304930 20230129221304930_0_1 2 0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet 2 a2 1002Obtain the timestamp when data is last committed by referring to step 3 and subtract 1 from the obtained timestamp. Then, perform a CDC query.
For example, the obtained timestamp is
20230129221304930. The start timestamp is 20230129221304929. Run the following command to perform a CDC query.select * from hudi_table_changes("hudi_cdc_test", "20230129221304929");The following output is returned:
u 20230129221304930 {"_hoodie_commit_time": "20230129220605215", "_hoodie_commit_seqno": "20230129220605215_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-14-17_20230129220605215.parquet", "id": 2, "name": "a2", "ts": 1001}{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"0524a2c6-a461-4ad1-8258-807a3a22f9a7-0_0-40-38_20230129221304930.parquet","_hoodie_commit_seqno":"20230129221304930_0_1","name":"a2","_hoodie_commit_time":"20230129221304930","ts":1002,"id":2}
Dataframe
Make preparations.
import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.spark.sql.{SaveMode, SparkSession} import org.apache.spark.sql.hudi.{HoodieSparkSessionExtension, HoodieSparkSqlTestBase} val spark: SparkSession = SparkSession.builder() .master("local[4]") .withExtensions(new HoodieSparkSessionExtension) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .getOrCreate() import spark.implicits._ val basePath = "/tmp/test/hudi_cdc_test" val writeOpts = Map( "hoodie.table.name" -> "hudi_cdc_test", "hoodie.datasource.write.recordkey.field" -> "id", "hoodie.datasource.write.precombine.field" -> "ts", "hoodie.table.cdc.enabled" -> "true", "hoodie.table.cdc.supplemental.logging.mode" -> "op_key_only" ) val readOpts = Map( "hoodie.datasource.query.type" -> "incremental", "hoodie.datasource.query.incremental.format" -> "cdc" )Use df1 to write data.
val df1 = Seq((1, "a1", 1000), (2, "a2", 1001)).toDF("id", "name", "ts") df1.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df1.show(false)The following output is returned:
+---+----+----+ |id |name|ts | +---+----+----+ |1 |a1 |1000| |2 |a2 |1001| +---+----+----+Read data from cdc1.
val metaClient = HoodieTableMetaClient.builder() .setBasePath(basePath) .setConf(spark.sessionState.newHadoopConf()) .build() val timestamp1 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc1 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp1.toLong - 1).toString) .load(basePath) cdc1.show(false)The following output is returned:
+---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before|after | +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |i |20230128030951890|null |{"_hoodie_record_key":"1","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_0","name":"a1","_hoodie_commit_time":"20230128030951890","ts":1000,"id":1}| |i |20230128030951890|null |{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet","_hoodie_commit_seqno":"20230128030951890_0_1","name":"a2","_hoodie_commit_time":"20230128030951890","ts":1001,"id":2}| +---+-----------------+------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+Use df2 to write data.
val df2 = Seq((2, "a2", 1002)).toDF("id", "name", "ts") df2.write.format("hudi") .options(writeOpts) .mode(SaveMode.Append) .save(basePath) df2.show(false)The following output is returned:
+---+----+----+ |id |name|ts | +---+----+----+ |2 |a2 |1002| +---+----+----+Read data from cdc2.
val timestamp2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp val cdc2 = spark.read.format("hudi") .options(readOpts) .option("hoodie.datasource.read.begin.instanttime", (timestamp2.toLong - 1).toString) .load(basePath) cdc2.show(false)The following output is returned:
+---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |op |ts_ms |before |after | +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ |u |20230128031235363|{"_hoodie_commit_time": "20230128030951890", "_hoodie_commit_seqno": "20230128030951890_0_1", "_hoodie_record_key": "2", "_hoodie_partition_path": "", "_hoodie_file_name": "6b253d50-1efb-400d-95e6-b67380219441-0_0-27-28_20230128030951890.parquet", "id": 2, "name": "a2", "ts": 1001}|{"_hoodie_record_key":"2","_hoodie_partition_path":"","_hoodie_file_name":"6b253d50-1efb-400d-95e6-b67380219441-0_0-60-52_20230128031235363.parquet","_hoodie_commit_seqno":"20230128031235363_0_1","name":"a2","_hoodie_commit_time":"20230128031235363","ts":1002,"id":2}| +---+-----------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+