All Products
Search
Document Center

E-MapReduce:Use Spark to write data to an Iceberg table and read data from the table in batch mode

Last Updated:Mar 26, 2026

Use the Spark DataFrame API to write data to an Iceberg table and read data from the table in batch mode on an E-MapReduce (EMR) Hadoop cluster. This topic uses Spark 3.x.

Prerequisites

Before you begin, ensure that you have:

  • An EMR Hadoop cluster running EMR V3.38.0, EMR V5.4.0, or a later minor version. For more information, see Create a cluster.

Step 1: Add Maven dependencies

Create a Maven project and add Spark and Iceberg dependencies to the project object model (POM) file.

The following example adds dependencies for Spark 3.1.1 and Iceberg 0.12.0 with provided scope, which means the dependencies are compiled locally but resolved from the cluster at runtime:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.12</artifactId>
    <version>3.1.1</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.1.1</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.iceberg</groupId>
    <artifactId>iceberg-core</artifactId>
    <version>0.12.0</version>
    <scope>provided</scope>
</dependency>
The Iceberg package bundled in your EMR cluster differs from the open-source Iceberg dependency — for example, Data Lake Formation (DLF) catalog integration is built in. Use the open-source Iceberg dependency with provided scope to compile and package the code locally, then let the cluster's bundled package take over at runtime.

Step 2: Configure a catalog

Before calling the Spark DataFrame API on an Iceberg table, configure a catalog in the SparkConf object. The catalog name and required parameters differ by cluster version. All examples below use DLF to manage metadata. For parameter details, see Configuration of DLF metadata.

Select the configuration for your cluster version:

  • EMR V3.40 or later, EMR V5.6.0 or later — catalog name: iceberg

    sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    sparkConf.set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
    sparkConf.set("spark.sql.catalog.iceberg.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
  • EMR V3.39.X, EMR V5.5.X — catalog name: dlf

    sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    sparkConf.set("spark.sql.catalog.dlf", "org.apache.iceberg.spark.SparkCatalog")
    sparkConf.set("spark.sql.catalog.dlf.catalog-impl", "org.apache.iceberg.aliyun.dlf.hive.DlfCatalog")
    sparkConf.set("spark.sql.catalog.dlf.warehouse", "<yourOSSWarehousePath>")
  • EMR V3.38.X, EMR V5.3.X, EMR V5.4.X — catalog name: dlf_catalog

    Set the ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables before running the code. For instructions, see the Configure environment variables section.
    sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
    sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
    sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
    sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
    sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
    sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
    sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")

Step 3: Write data to an Iceberg table

Use the DataFrameWriterV2 API (not the DataFrameWriterV1 API) to write data from Spark 3.x to an Iceberg table. The v2 API is preferred because each operation maps directly to a SQL statement and overwrite behavior is explicit:

DataFrame operationEquivalent SQLDescription
df.writeTo(t).create()CREATE TABLE AS SELECTCreates a new table from the DataFrame
df.writeTo(t).replace()REPLACE TABLE AS SELECTReplaces an existing table
df.writeTo(t).createOrReplace()CREATE OR REPLACE TABLE AS SELECTCreates the table if it does not exist, or replaces it
df.writeTo(t).append()INSERT INTOAppends rows to an existing table
df.writeTo(t).overwritePartitions()Dynamic INSERT OVERWRITEOverwrites matching partitions

In the following examples, replace <yourCatalogName> with your actual catalog name.

Create a table:

val df: DataFrame = ...
df.writeTo("<yourCatalogName>.iceberg_db.sample").create()

Use tableProperty() to set table properties and partitionedBy() to define partition fields when creating the table.

Append data:

val df: DataFrame = ...
df.writeTo("<yourCatalogName>.iceberg_db.sample").append()

Overwrite partitions:

val df: DataFrame = ...
df.writeTo("<yourCatalogName>.iceberg_db.sample").overwritePartitions()

Step 4: Read data from an Iceberg table

Select the read method based on your Spark version:

  • Spark 3.x (recommended)

    val df = spark.table("<yourCatalogName>.iceberg_db.sample")
  • Spark 2.4

    val df = spark.read.format("iceberg").load("<yourCatalogName>.iceberg_db.sample")

Example: end-to-end batch read/write

This example uses an EMR V5.3.0 cluster with the dlf_catalog catalog. The catalog name and parameters vary by cluster version — for details, see Configuration of DLF metadata.

Step 1: Create an Iceberg database

Use Spark SQL to create a database named iceberg_db. For instructions, see Use Iceberg.

Step 2: Write the Spark application

The following Scala code configures the dlf_catalog catalog, creates an Iceberg table, appends data, and reads the result:

def main(args: Array[String]): Unit = {

  // Configure the DLF catalog
  val sparkConf = new SparkConf()
  sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
  sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
  sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
  sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"))
  sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET"))
  sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")

  val spark = SparkSession
    .builder()
    .config(sparkConf)
    .appName("IcebergReadWriteTest")
    .getOrCreate()

  // Create the Iceberg table and write the first batch of rows
  val firstDF = spark.createDataFrame(Seq(
    (1, "a"), (2, "b"), (3, "c")
  )).toDF("id", "data")

  firstDF.writeTo("dlf_catalog.iceberg_db.sample").createOrReplace()

  // Append a second batch of rows
  val secondDF = spark.createDataFrame(Seq(
    (4, "d"), (5, "e"), (6, "f")
  )).toDF("id", "data")

  secondDF.writeTo("dlf_catalog.iceberg_db.sample").append()

  // Read all rows from the Iceberg table
  val icebergTable = spark.table("dlf_catalog.iceberg_db.sample")

  icebergTable.show()
}

Replace the following placeholders with actual values:

PlaceholderDescription
<yourOSSEndpoint>OSS endpoint, for example, oss-cn-hangzhou.aliyuncs.com
<yourOSSWarehousePath>OSS path used as the Iceberg warehouse root
<yourCatalogId>DLF catalog ID
<yourDLFEndpoint>DLF endpoint
<yourDLFRegionId>Region ID where DLF is deployed, for example, cn-hangzhou

Step 3: Build and deploy the application

  1. Add the Scala Maven plugin to pom.xml to compile Scala source files:

    <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  2. Debug the code locally, then run the following command to package it:

    mvn clean install
  3. Log on to your EMR cluster over SSH. For instructions, see Log on to a cluster.

  4. Upload the JAR package to the EMR cluster. This example places the JAR package in the root directory of the cluster.

Step 4: Submit the Spark job

Run the following spark-submit command to submit the job:

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 1g \
  --executor-cores 1 \
  --executor-memory 1g \
  --num-executors 1 \
  --class com.aliyun.iceberg.IcebergTest \
  iceberg-demos.jar
This example uses a JAR package named iceberg-demos.jar with the main class com.aliyun.iceberg.IcebergTest. Update --class and the JAR filename to match your project.

The following output is returned after the job completes:

+---+----+
| id|data|
+---+----+
|  4|   d|
|  1|   a|
|  5|   e|
|  6|   f|
|  2|   b|
|  3|   c|
+---+----+

What's next