All Products
Search
Document Center

E-MapReduce:Basic usage

Last Updated:Mar 26, 2026

This topic describes how to write data to and query data from Apache Hudi tables on E-MapReduce (EMR).

Prerequisites

Before you begin, ensure that you have:

  • An EMR cluster running EMR V3.32.0 or later

Set up the environment

In EMR V3.32.0 and later, Hudi dependencies are built into Spark, Hive, and Presto. No additional runtime dependencies are required. Add the following Maven dependency to your pom.xml:

<dependency>
  <groupId>org.apache.hudi</groupId>
  <artifactId>hudi-spark_2.11</artifactId>
  <version>${hudi_version}</version>
  <scope>provided</scope>
</dependency>
For Spark 3, use hudi-spark_2.12 as the artifactId instead of hudi-spark_2.11.

The Hudi version bundled with your cluster depends on the EMR version. Use the following table to find the correct ${hudi_version} value:

Hudi version EMR version
0.6.0 EMR V3.32.0–V3.35.0; EMR V4.5.0–V4.9.0; EMR V5.1.0
0.8.0 EMR V3.36.1–V3.37.1; EMR V5.2.1–V5.3.1
0.9.0 EMR V3.38.0–V3.38.3; EMR V5.4.0–V5.4.3
0.10.0 EMR V3.39.1–V3.40.0; EMR V4.10.0; EMR V5.5.0–V5.6.0
0.11.0 EMR V3.42.0; EMR V5.8.0
0.12.0 EMR V3.43.0–V3.44.1; EMR V5.9.0–V5.10.1
0.12.2 EMR V3.45.0–V3.46.1; EMR V5.11.0–V5.12.1
0.13.1 EMR V3.47.0–V3.48.0; EMR V5.13.0–V5.14.0

Write data

All write examples share a common set of Hudi options. Define them once and pass them to each write operation:

val spark = SparkSession
  .builder()
  .master("local[*]")
  .appName("hudi test")
  .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .getOrCreate()

import spark.implicits._

// Sample dataset
val df = (for (i <- 0 until 10) yield (i, s"a$i", 30 + i * 0.2, 100 * i + 10000, s"p${i % 5}"))
  .toDF("id", "name", "price", "version", "dt")

// Common options shared across all write operations
val hudiOptions = Map(
  TABLE_NAME                             -> "hudi_test_0",
  RECORDKEY_FIELD_OPT_KEY               -> "id",
  PRECOMBINE_FIELD_OPT_KEY              -> "version",
  KEYGENERATOR_CLASS_OPT_KEY            -> classOf[SimpleKeyGenerator].getName,
  HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getCanonicalName,
  PARTITIONPATH_FIELD_OPT_KEY           -> "dt",
  HIVE_PARTITION_FIELDS_OPT_KEY         -> "ds",
  META_SYNC_ENABLED_OPT_KEY             -> "true",
  HIVE_USE_JDBC_OPT_KEY                 -> "false",
  HIVE_DATABASE_OPT_KEY                 -> "default",
  HIVE_TABLE_OPT_KEY                    -> "hudi_test_0"
)

Key parameters:

Parameter Description
TABLE_NAME The Hudi table name.
RECORDKEY_FIELD_OPT_KEY The field used as the record key. Records with the same key are deduplicated or updated based on the operation type.
PRECOMBINE_FIELD_OPT_KEY The field used for pre-combining before a write. When two records share the same key, Hudi keeps the one with the larger value in this field.
KEYGENERATOR_CLASS_OPT_KEY The key generator class. SimpleKeyGenerator generates keys from a single partition field.
PARTITIONPATH_FIELD_OPT_KEY The DataFrame field used as the Hudi partition path.
HIVE_PARTITION_FIELDS_OPT_KEY The Hive partition column name for metadata synchronization.
META_SYNC_ENABLED_OPT_KEY Enables Hive metadata synchronization. Required when querying the table with Hive or Presto.
HIVE_DATABASE_OPT_KEY The target Hive database for metadata synchronization.
HIVE_TABLE_OPT_KEY The target Hive table name for metadata synchronization.

Insert data

df.write.format("hudi")
  .options(hudiOptions)
  .option(OPERATION_OPT_KEY,  INSERT_OPERATION_OPT_VAL)
  .option(INSERT_PARALLELISM, "8")
  .option(UPSERT_PARALLELISM, "8")
  .mode(Overwrite)
  .save("/tmp/hudi/h0")

To update existing records instead, replace INSERT_OPERATION_OPT_VAL with UPSERT_OPERATION_OPT_VAL.

Delete data

df.write.format("hudi")
  .options(hudiOptions)
  .option(OPERATION_OPT_KEY,  DELETE_OPERATION_OPT_VAL)
  .option(DELETE_PARALLELISM, "8")
  .mode(Append)
  .save("/tmp/hudi/h0")

The delete operation matches records by the key defined in RECORDKEY_FIELD_OPT_KEY and removes them from the table.

Query data

Hudi is integrated into Spark, Hive, and Presto in EMR. No additional dependencies are needed to run queries.

To query a Hudi table with Hive or Presto, enable metadata synchronization at write time by setting META_SYNC_ENABLED_OPT_KEY to "true". See Write data.

Input format behavior

EMR Hudi and open source Hudi handle input formats differently:

  • Open source Hudi: You must set hive.input.format to org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat for both Copy on Write and Merge on Read tables.

  • EMR Hudi: You do not need to specify an input format for Copy on Write tables. Copy on Write tables automatically adapt to input formats of Hudi.