Use the Spark DataFrame API to write data to an Iceberg table and read data from the table in batch mode on an E-MapReduce (EMR) Hadoop cluster. This topic uses Spark 3.x.
Prerequisites
Before you begin, ensure that you have:
An EMR Hadoop cluster running EMR V3.38.0, EMR V5.4.0, or a later minor version. For more information, see Create a cluster.
Step 1: Add Maven dependencies
Create a Maven project and add Spark and Iceberg dependencies to the project object model (POM) file.
The following example adds dependencies for Spark 3.1.1 and Iceberg 0.12.0 with provided scope, which means the dependencies are compiled locally but resolved from the cluster at runtime:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>0.12.0</version>
<scope>provided</scope>
</dependency>The Iceberg package bundled in your EMR cluster differs from the open-source Iceberg dependency — for example, Data Lake Formation (DLF) catalog integration is built in. Use the open-source Iceberg dependency with provided scope to compile and package the code locally, then let the cluster's bundled package take over at runtime.Step 2: Configure a catalog
Before calling the Spark DataFrame API on an Iceberg table, configure a catalog in the SparkConf object. The catalog name and required parameters differ by cluster version. All examples below use DLF to manage metadata. For parameter details, see Configuration of DLF metadata.
Select the configuration for your cluster version:
EMR V3.40 or later, EMR V5.6.0 or later — catalog name:
icebergsparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")EMR V3.39.X, EMR V5.5.X — catalog name:
dlfsparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")EMR V3.38.X, EMR V5.3.X, EMR V5.4.X — catalog name:
dlf_catalogSet the
ALIBABA_CLOUD_ACCESS_KEY_IDandALIBABA_CLOUD_ACCESS_KEY_SECRETenvironment variables before running the code. For instructions, see the Configure environment variables section.sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog") sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO") sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>") sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID")) sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")) sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>") sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
Step 3: Write data to an Iceberg table
Use the DataFrameWriterV2 API (not the DataFrameWriterV1 API) to write data from Spark 3.x to an Iceberg table. The v2 API is preferred because each operation maps directly to a SQL statement and overwrite behavior is explicit:
| DataFrame operation | Equivalent SQL | Description |
|---|---|---|
df.writeTo(t).create() | CREATE TABLE AS SELECT | Creates a new table from the DataFrame |
df.writeTo(t).replace() | REPLACE TABLE AS SELECT | Replaces an existing table |
df.writeTo(t).createOrReplace() | CREATE OR REPLACE TABLE AS SELECT | Creates the table if it does not exist, or replaces it |
df.writeTo(t).append() | INSERT INTO | Appends rows to an existing table |
df.writeTo(t).overwritePartitions() | Dynamic INSERT OVERWRITE | Overwrites matching partitions |
In the following examples, replace <yourCatalogName> with your actual catalog name.
Create a table:
val df: DataFrame = ...
df.writeTo("<yourCatalogName>.iceberg_db.sample").create()Use tableProperty() to set table properties and partitionedBy() to define partition fields when creating the table.
Append data:
val df: DataFrame = ...
df.writeTo("<yourCatalogName>.iceberg_db.sample").append()Overwrite partitions:
val df: DataFrame = ...
df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()Step 4: Read data from an Iceberg table
Select the read method based on your Spark version:
Spark 3.x (recommended)
val df = spark.table("<yourCatalogName>.iceberg_db.sample")Spark 2.4
val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")
Example: end-to-end batch read/write
This example uses an EMR V5.3.0 cluster with the dlf_catalog catalog. The catalog name and parameters vary by cluster version — for details, see Configuration of DLF metadata.
Step 1: Create an Iceberg database
Use Spark SQL to create a database named iceberg_db. For instructions, see Use Iceberg.
Step 2: Write the Spark application
The following Scala code configures the dlf_catalog catalog, creates an Iceberg table, appends data, and reads the result:
def main(args: Array[String]): Unit = {
// Configure the DLF catalog
val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
val spark = SparkSession
.builder()
.config(sparkConf)
.appName("IcebergReadWriteTest")
.getOrCreate()
// Create the Iceberg table and write the first batch of rows
val firstDF = spark.createDataFrame(Seq(
(1, "a"), (2, "b"), (3, "c")
)).toDF("id", "data")
firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace()
// Append a second batch of rows
val secondDF = spark.createDataFrame(Seq(
(4, "d"), (5, "e"), (6, "f")
)).toDF("id", "data")
secondDF.writeTo("dlf_catalog.iceberg_db.sample").append()
// Read all rows from the Iceberg table
val icebergTable = spark.table("dlf_catalog.iceberg_db.sample")
icebergTable.show()
}Replace the following placeholders with actual values:
| Placeholder | Description |
|---|---|
<yourOSSEndpoint> | OSS endpoint, for example, oss-cn-hangzhou.aliyuncs.com |
<yourOSSWarehousePath> | OSS path used as the Iceberg warehouse root |
<yourCatalogId> | DLF catalog ID |
<yourDLFEndpoint> | DLF endpoint |
<yourDLFRegionId> | Region ID where DLF is deployed, for example, cn-hangzhou |
Step 3: Build and deploy the application
Add the Scala Maven plugin to pom.xml to compile Scala source files:
<build> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build>Debug the code locally, then run the following command to package it:
mvn clean installLog on to your EMR cluster over SSH. For instructions, see Log on to a cluster.
Upload the JAR package to the EMR cluster. This example places the JAR package in the root directory of the cluster.
Step 4: Submit the Spark job
Run the following spark-submit command to submit the job:
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-cores 1 \
--executor-memory 1g \
--num-executors 1 \
--class com.aliyun.iceberg.IcebergTest \
iceberg-demos.jarThis example uses a JAR package namediceberg-demos.jarwith the main classcom.aliyun.iceberg.IcebergTest. Update--classand the JAR filename to match your project.
The following output is returned after the job completes:
+---+----+
| id|data|
+---+----+
| 4| d|
| 1| a|
| 5| e|
| 6| f|
| 2| b|
| 3| c|
+---+----+What's next
Use Iceberg — manage Iceberg tables with Spark SQL
Configuration of DLF metadata — catalog configuration reference for all EMR versions