This topic describes how to write data to and query data from E-MapReduce (EMR) Hudi.

Write data

Environment configuration

In EMR V3.32.0 and later, dependencies related to Hudi are integrated into various open source components, such as Spark, Hive, and Presto. When you run a job to write data to Hudi, you do not need to introduce the dependencies again. You need to only add the following Hudi dependency to the pom.xml file. The Hudi version that is used varies based on the EMR version. The following table describes the mapping between Hudi versions and EMR versions.
Hudi version EMR version
0.6 EMR V3.32 to EMR V3.35 or EMR V4.5 to EMR V4.9
0.8 EMR V3.36 to EMR V3.37 or EMR V5.2
0.9 EMR V3.38 or EMR V5.4
0.10 EMR V3.39, EMR V4.10, or EMR V5.5.0
<dependency>
   <groupId>org.apache.hudi</groupId>
   <artifactId>hudi-spark_2.11</artifactId>
   <!-- for spark3 <artifactId>hudi-spark_2.12</artifactId> -->
   <version>${hudi_version}</version>
  <scope>provided</scope>
</dependency>

Insert or update data

Sample code:
 val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("hudi test")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate()

import spark.implicits._
    val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
      .toDF("id", "name", "price", "version", "dt")

    df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      // .option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL) for update
      .option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL) // for insert
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(HIVE_PARTITION_FIELDS_OPT_KEY, "ds")
      .option(META_SYNC_ENABLED_OPT_KEY, "true")
      .option(HIVE_USE_JDBC_OPT_KEY, "false")
      .option(HIVE_DATABASE_OPT_KEY, "default")
      .option(HIVE_TABLE_OPT_KEY, "hudi_test_0")
      .option(INSERT_PARALLELISM, "8")
      .option(UPSERT_PARALLELISM, "8")
      .mode(Overwrite)
      .save("/tmp/hudi/h0")

Delete data

Sample code:
df.write.format("hudi")
      .option(TABLE_NAME, "hudi_test_0")
      .option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL) // for delete
      .option(RECORDKEY_FIELD_OPT_KEY, "id")
      .option(PRECOMBINE_FIELD_OPT_KEY, "version")
      .option(KEYGENERATOR_CLASS_OPT_KEY, classOf[SimpleKeyGenerator].getName)
      .option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getCanonicalName)
      .option(PARTITIONPATH_FIELD_OPT_KEY, "dt")
      .option(DELETE_PARALLELISM, "8")
      .mode(Append)
      .save("/tmp/hudi/h0")

Query data

Software packages related to Hudi are integrated into various open source components such as Spark, Presto, and Hive in EMR. You can query data in a Hudi table without the need to introduce additional dependencies.

If you use Hive or Presto to query Hudi tables, you must enable metadata synchronization at the write stage by setting META_SYNC_ENABLED_OPT_KEY to true.

If you use open source Hudi, you must set hive.input.format to org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat for both Copy on Write and Merge on Read tables. If you use EMR Hudi, you do not need to specify an input format for Copy on Write tables. Copy on Write tables automatically adapt to input formats of Hudi.