Hudi is a data lake framework that enables you to update and delete data in Hadoop compatible file systems. It also allows you to consume changed data.

Scenarios

  • Near real-time ingestion

    Hudi can be used to insert, update, and delete data. You can ingest log data from Kafka and Log Service into Hudi in real time. You can also synchronize data updates recorded in the binary logs of databases to Hudi in real time.

    Hudi optimizes the formats of files generated during the data writing process to solve the small file problem of HDFS. In this aspect, Hudi is more compatible with HDFS than other traditional data lake solutions.

  • Near real-time analytics

    Hudi supports various data analytics engines, such as Hive, Spark, Presto, and Impala. Hudi is lightweight and does not rely on additional service processes.

  • Incremental data processing

    Hudi supports incremental queries. You can run Spark Streaming jobs to query the latest data that is written after a specific commit. Hudi provides the consumption feature for you to consume changed data in HDFS. You can use this feature to optimize the existing system architecture.

Table types

Hudi supports the following two table types:
  • Copy on Write

    Data is stored in an exclusive columnar storage format, such as Parquet. Each update creates a new version of files during a write operation.

  • Merge on Read

    Data is stored based on the combination of a columnar storage format, such as Parquet, and a row-based storage format, such as Avro. Base data is stored in a columnar storage format, and incremental data is stored in a row-based storage format. Incremental data is logged to row-based files and is compacted as needed to create new versions of columnar files.

Query types

Hudi supports the following three query types:
  • Snapshot queries

    Queries the latest snapshot of a specific commit. For Merge on Read tables, Hudi merges the base data in columnar storage and real-time log data online during snapshot queries. For Copy on Write tables, Hudi can query the latest version of data in the Parquet format.

    Both Copy on Write and Merge on Read tables support snapshot queries.

  • Incremental queries

    Queries the latest data that is written after a specific commit.

    Only Copy on Write tables support incremental queries.

  • Read-optimized queries

    Queries only the latest data within a specified scope before a specific commit. Read-optimized queries are optimized snapshot queries on Merge on Read tables.

    Both Copy on Write and Merge on Read tables support read-optimized queries.

Insert data

In EMR V3.32.0 and later, the Hudi service is supported by default. In these versions, to use Hudi, you need only to add Hudi dependencies to the POM file.
<dependency>
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi-spark_2.11</artifactId>
   <version>0.6.0</version>
  <scope>provided</scope>
</dependency>
Examples:
  • Insert or update data
     val spark = SparkSession
          .builder()
          .master("local[*]")
          .appName("hudi test")
          .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .getOrCreate()
    
    import spark.implicits._
        val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
          .toDF("id", "name", "price", "version", "dt")
    
        df.write.format("hudi")
          .option(TABLE_NAME, "hudi_test_0")
          // .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) // Update data.
          .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // Insert data.
          .option(RECORDKEY_FIELD_OPT_KEY, "id")
          .option(PRECOMBINE_FIELD_OPT_KEY, "version")
          .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
          .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
          .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
          .option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
          .option(META_SYNC_ENABLED_OPT_KEY, "true")// Enable metadata synchronization.
          .option(HIVE_USE_JDBC_OPT_KEY, "false")
          .option(HIVE_DATABASE_OPT_KEY, "default")
          .option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
          .option(INSERT_PARALLELISM, "8")
          .option(UPSERT_PARALLELISM, "8")
          .mode(Overwrite)
          .save("/tmp/hudi/h0")
  • Delete data
    df.write.format("hudi")
          .option(TABLE_NAME, "hudi_test_0")
          .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // Delete data.
          .option(RECORDKEY_FIELD_OPT_KEY, "id")
          .option(PRECOMBINE_FIELD_OPT_KEY, "version")
          .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
          .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
          .option(DELETE_PARALLELISM, "8")
          .mode(Append)
          .save("/tmp/hudi/h0")

Query data

If you use Hive or Presto to query Hudi tables, you must enable metadata synchronization at the write stage by setting META_SYNC_ENABLED_OPT_KEY to true.

If you use open source Hudi, you must set hive.input.format to org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat for both Copy on Write and Merge on Read tables. If you use EMR Hudi, you do not need to set this parameter for Copy on Write tables.

For more information about Hudi, visit Hudi.