This topic shows how to configure Spark and use Spark SQL or the Spark Dataset API to read and write Delta Lake and Apache Hudi tables on an EMR cluster.
For background on Delta Lake, see the Delta Lake documentation. For background on Apache Hudi, see the Apache Hudi documentation.
Prerequisites
Before you begin, ensure that you have:
-
An EMR cluster with Spark installed
-
The Project Object Model (POM) dependencies for Delta Lake or Apache Hudi added to your project
Configure Spark parameters
Pass the following parameters when starting a Spark session. Add them as --conf flags in your spark-shell or spark-submit command, or set them in your SparkConf.
Delta Lake
spark-shell \
--conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
The spark.sql.catalog.spark_catalog parameter is required only when your cluster runs Spark 3.
Apache Hudi
spark-shell \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog
The spark.sql.catalog.spark_catalog parameter is required only when your cluster runs Spark 3.
Read and write Delta Lake tables
Spark SQL
-- Create a table.
create table delta_tbl (id int, name string) using delta;
-- Insert data into the table.
insert into delta_tbl values (1, "a1"), (2, "a2");
-- Update data in the table.
update delta_tbl set name = 'a1_new' where id = 1;
-- Delete data from the table.
delete from delta_tbl where id = 1;
-- Query data from the table.
select * from delta_tbl;
Spark Dataset API (Scala)
// Write data to the table.
val df = Seq((1, "a1"), (2, "a2")).toDF("id", name)
df.write.format("delta").save("/tmp/delta_tbl")
// Read data from the table.
spark.read.format("delta").load("/tmp/delta_tbl")
Spark Dataset API (PySpark)
# Write data to the table.
df = spark.createDataFrame([(1, "a1"), (2, "a2")], ["id", "name"])
df.write.format("delta").save("/tmp/delta_tbl")
# Read data from the table.
spark.read.format("delta").load("/tmp/delta_tbl")
Read and write Apache Hudi tables
Apache Hudi tables require two key properties at table creation:
| Property | Description |
|---|---|
primaryKey |
The field that uniquely identifies each record, equivalent to a primary key. |
preCombineField |
The field used to resolve duplicate records. Hudi keeps the record with the highest value in this field. Typically a timestamp or version column. |
Spark SQL
-- Create a table.
create table hudi_tbl (
id bigint,
name string,
price double,
ts long
) using hudi
tblproperties (
primaryKey="id",
preCombineField="ts"
);
-- Insert data into the table.
insert into hudi_tbl values (1, 'a1', 10.0, 1000), (2, 'a2', 11.0, 1000);
-- Update data in the table.
update hudi_tbl set name = 'a1_new' where id = 1;
-- Delete data from the table.
delete from hudi_tbl where id = 1;
-- Query data from the table.
select * from hudi_tbl;
Spark Dataset API (Scala)
// Write data to the table.
import org.apache.hudi.DataSourceWriteOptions._
val df = Seq((1, "a1", 10.0, 1000), (2, "a2", 11.0, 1000)).toDF("id", "name", "price", "ts")
df.write.format("hudi").
option(PRECOMBINE_FIELD.key(), "ts").
option(RECORDKEY_FIELD.key(), "id").
option(PARTITIONPATH_FIELD.key(), "").
option("hoodie.table.name", "hudi_tbl").
mode("append").
save("/tmp/hudi_tbl")
// Read data from the table.
spark.read.format("hudi").load("/tmp/hudi_tbl")
Write options
| Option | Description |
|---|---|
PRECOMBINE_FIELD |
The field used to resolve duplicate records. Hudi keeps the record with the highest value in this field. Typically a timestamp or version column. |
RECORDKEY_FIELD |
The field that uniquely identifies each record in the table, equivalent to a primary key. |
PARTITIONPATH_FIELD |
The field used to partition the table. Leave blank ("") for non-partitioned tables. |
hoodie.table.name |
The table name used to register the dataset. |
Spark Dataset API (PySpark)
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Write data to the table.
hudi_options = {
'hoodie.table.name': 'hudi_tbl',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.datasource.write.partitionpath.field': '',
}
df = spark.createDataFrame([
Row(id=1, name="a1", price=10.0, ts=1000),
Row(id=2, name="a2", price=11.0, ts=1000),
])
df.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
save("/tmp/hudi_tbl")
# Read data from the table.
spark.read.format("hudi").load("/tmp/hudi_tbl")