This topic describes how to write data to and query data from Apache Hudi tables on E-MapReduce (EMR).
Prerequisites
Before you begin, ensure that you have:
-
An EMR cluster running EMR V3.32.0 or later
Set up the environment
In EMR V3.32.0 and later, Hudi dependencies are built into Spark, Hive, and Presto. No additional runtime dependencies are required. Add the following Maven dependency to your pom.xml:
<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-spark_2.11</artifactId>
<version>${hudi_version}</version>
<scope>provided</scope>
</dependency>
For Spark 3, usehudi-spark_2.12as theartifactIdinstead ofhudi-spark_2.11.
The Hudi version bundled with your cluster depends on the EMR version. Use the following table to find the correct ${hudi_version} value:
| Hudi version | EMR version |
|---|---|
| 0.6.0 | EMR V3.32.0–V3.35.0; EMR V4.5.0–V4.9.0; EMR V5.1.0 |
| 0.8.0 | EMR V3.36.1–V3.37.1; EMR V5.2.1–V5.3.1 |
| 0.9.0 | EMR V3.38.0–V3.38.3; EMR V5.4.0–V5.4.3 |
| 0.10.0 | EMR V3.39.1–V3.40.0; EMR V4.10.0; EMR V5.5.0–V5.6.0 |
| 0.11.0 | EMR V3.42.0; EMR V5.8.0 |
| 0.12.0 | EMR V3.43.0–V3.44.1; EMR V5.9.0–V5.10.1 |
| 0.12.2 | EMR V3.45.0–V3.46.1; EMR V5.11.0–V5.12.1 |
| 0.13.1 | EMR V3.47.0–V3.48.0; EMR V5.13.0–V5.14.0 |
Write data
All write examples share a common set of Hudi options. Define them once and pass them to each write operation:
val spark = SparkSession
.builder()
.master("local[*]")
.appName("hudi test")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate()
import spark.implicits._
// Sample dataset
val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
.toDF("id", "name", "price", "version", "dt")
// Common options shared across all write operations
val hudiOptions = Map(
TABLE_NAME -> "hudi_test_0",
RECORDKEY_FIELD_OPT_KEY -> "id",
PRECOMBINE_FIELD_OPT_KEY -> "version",
KEYGENERATOR_CLASS_OPT_KEY -> classOf[SimpleKeyGenerator].getName,
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
PARTITIONPATH_FIELD_OPT_KEY -> "dt",
HIVE_PARTITION_FIELDS_OPT_KEY -> "ds",
META_SYNC_ENABLED_OPT_KEY -> "true",
HIVE_USE_JDBC_OPT_KEY -> "false",
HIVE_DATABASE_OPT_KEY -> "default",
HIVE_TABLE_OPT_KEY -> "hudi_test_0"
)
Key parameters:
| Parameter | Description |
|---|---|
TABLE_NAME |
The Hudi table name. |
RECORDKEY_FIELD_OPT_KEY |
The field used as the record key. Records with the same key are deduplicated or updated based on the operation type. |
PRECOMBINE_FIELD_OPT_KEY |
The field used for pre-combining before a write. When two records share the same key, Hudi keeps the one with the larger value in this field. |
KEYGENERATOR_CLASS_OPT_KEY |
The key generator class. SimpleKeyGenerator generates keys from a single partition field. |
PARTITIONPATH_FIELD_OPT_KEY |
The DataFrame field used as the Hudi partition path. |
HIVE_PARTITION_FIELDS_OPT_KEY |
The Hive partition column name for metadata synchronization. |
META_SYNC_ENABLED_OPT_KEY |
Enables Hive metadata synchronization. Required when querying the table with Hive or Presto. |
HIVE_DATABASE_OPT_KEY |
The target Hive database for metadata synchronization. |
HIVE_TABLE_OPT_KEY |
The target Hive table name for metadata synchronization. |
Insert data
df.write.format("hudi")
.options(hudiOptions)
.option(OPERATION_OPT_KEY, INSERT_OPERATION_OPT_VAL)
.option(INSERT_PARALLELISM, "8")
.option(UPSERT_PARALLELISM, "8")
.mode(Overwrite)
.save("/tmp/hudi/h0")
To update existing records instead, replace INSERT_OPERATION_OPT_VAL with UPSERT_OPERATION_OPT_VAL.
Delete data
df.write.format("hudi")
.options(hudiOptions)
.option(OPERATION_OPT_KEY, DELETE_OPERATION_OPT_VAL)
.option(DELETE_PARALLELISM, "8")
.mode(Append)
.save("/tmp/hudi/h0")
The delete operation matches records by the key defined in RECORDKEY_FIELD_OPT_KEY and removes them from the table.
Query data
Hudi is integrated into Spark, Hive, and Presto in EMR. No additional dependencies are needed to run queries.
To query a Hudi table with Hive or Presto, enable metadata synchronization at write time by settingMETA_SYNC_ENABLED_OPT_KEYto"true". See Write data.
Input format behavior
EMR Hudi and open source Hudi handle input formats differently:
-
Open source Hudi: You must set
hive.input.formattoorg.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormatfor both Copy on Write and Merge on Read tables. -
EMR Hudi: You do not need to specify an input format for Copy on Write tables. Copy on Write tables automatically adapt to input formats of Hudi.