All Products
Search
Document Center

E-MapReduce:Process Delta Lake and Hudi data with Spark

Last Updated:Mar 26, 2026

This topic shows how to configure Spark and use Spark SQL or the Spark Dataset API to read and write Delta Lake and Apache Hudi tables on an EMR cluster.

For background on Delta Lake, see the Delta Lake documentation. For background on Apache Hudi, see the Apache Hudi documentation.

Prerequisites

Before you begin, ensure that you have:

  • An EMR cluster with Spark installed

  • The Project Object Model (POM) dependencies for Delta Lake or Apache Hudi added to your project

Configure Spark parameters

Pass the following parameters when starting a Spark session. Add them as --conf flags in your spark-shell or spark-submit command, or set them in your SparkConf.

Delta Lake

spark-shell \
  --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
The spark.sql.catalog.spark_catalog parameter is required only when your cluster runs Spark 3.

Apache Hudi

spark-shell \
  --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
  --conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
  --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
The spark.sql.catalog.spark_catalog parameter is required only when your cluster runs Spark 3.

Read and write Delta Lake tables

Spark SQL

-- Create a table.
create table delta_tbl (id int, name string) using delta;

-- Insert data into the table.
insert into delta_tbl values (1, "a1"), (2, "a2");

-- Update data in the table.
update delta_tbl set name = 'a1_new' where id = 1;

-- Delete data from the table.
delete from delta_tbl where id = 1;

-- Query data from the table.
select * from delta_tbl;

Spark Dataset API (Scala)

// Write data to the table.
val df = Seq((1, "a1"), (2, "a2")).toDF("id", name)
df.write.format("delta").save("/tmp/delta_tbl")

// Read data from the table.
spark.read.format("delta").load("/tmp/delta_tbl")

Spark Dataset API (PySpark)

# Write data to the table.
df = spark.createDataFrame([(1, "a1"), (2, "a2")], ["id", "name"])
df.write.format("delta").save("/tmp/delta_tbl")

# Read data from the table.
spark.read.format("delta").load("/tmp/delta_tbl")

Read and write Apache Hudi tables

Apache Hudi tables require two key properties at table creation:

Property Description
primaryKey The field that uniquely identifies each record, equivalent to a primary key.
preCombineField The field used to resolve duplicate records. Hudi keeps the record with the highest value in this field. Typically a timestamp or version column.

Spark SQL

-- Create a table.
create table hudi_tbl (
  id bigint,
  name string,
  price double,
  ts long
) using hudi
tblproperties (
  primaryKey="id",
  preCombineField="ts"
);

-- Insert data into the table.
insert into hudi_tbl values (1, 'a1', 10.0, 1000), (2, 'a2', 11.0, 1000);

-- Update data in the table.
update hudi_tbl set name = 'a1_new' where id = 1;

-- Delete data from the table.
delete from hudi_tbl where id = 1;

-- Query data from the table.
select * from hudi_tbl;

Spark Dataset API (Scala)

// Write data to the table.
import org.apache.hudi.DataSourceWriteOptions._

val df = Seq((1, "a1", 10.0, 1000), (2, "a2", 11.0, 1000)).toDF("id", "name", "price", "ts")

df.write.format("hudi").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "").
option("hoodie.table.name", "hudi_tbl").
mode("append").
save("/tmp/hudi_tbl")

// Read data from the table.
spark.read.format("hudi").load("/tmp/hudi_tbl")

Write options

Option Description
PRECOMBINE_FIELD The field used to resolve duplicate records. Hudi keeps the record with the highest value in this field. Typically a timestamp or version column.
RECORDKEY_FIELD The field that uniquely identifies each record in the table, equivalent to a primary key.
PARTITIONPATH_FIELD The field used to partition the table. Leave blank ("") for non-partitioned tables.
hoodie.table.name The table name used to register the dataset.

Spark Dataset API (PySpark)

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Write data to the table.
hudi_options = {
    'hoodie.table.name': 'hudi_tbl',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.precombine.field': 'ts',
    'hoodie.datasource.write.partitionpath.field': '',
}

df = spark.createDataFrame([
    Row(id=1, name="a1", price=10.0, ts=1000),
    Row(id=2, name="a2", price=11.0, ts=1000),
])

df.write.format("hudi"). \
    options(**hudi_options). \
    mode("append"). \
    save("/tmp/hudi_tbl")

# Read data from the table.
spark.read.format("hudi").load("/tmp/hudi_tbl")