All Products
Search
Document Center

E-MapReduce:Use Spark to write data to an Iceberg table in streaming mode

Last Updated:Mar 26, 2026

Use Apache Spark Structured Streaming to continuously write data from Apache Kafka into an Apache Iceberg table on EMR. All writes go through the DataStreamWriter API.

Prerequisites

Before you begin, make sure you have:

  • A DataLake cluster or custom cluster. For more information, see Create a cluster.

  • A Dataflow cluster with the Kafka service enabled. For more information, see Create a cluster.

  • Both clusters deployed in the same vSwitch of the same virtual private cloud (VPC).

How it works

Submit a streaming write job by calling DataStreamWriter from Spark Structured Streaming:

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()

tableIdentifier is either the name or the path of the target metadata table. Two output modes are supported:

ModeBehaviorSQL equivalent
appendWrites each micro-batch as new rowsINSERT INTO
completeReplaces the entire table with the latest micro-batchINSERT OVERWRITE

Stream data from Kafka to an Iceberg table

This example reads data from a Dataflow cluster (Kafka) and writes it to an Iceberg table. After packaging the code, submit it as a Spark job using spark-submit.

Step 1: Set up a Kafka topic with test data

  1. Log in to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.

  2. Create a topic named iceberg_test:

    kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 \
      --topic iceberg_test \
      --partitions 3 \
      --replication-factor 2 \
      --create
  3. Produce test data to the topic:

    kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 \
      --topic iceberg_test

Step 2: Create a database and table in Iceberg

Use Spark SQL to create a database named iceberg_db and a table named iceberg_table. For more information, see Use Iceberg.

Step 3: Set up the Maven project

Create a Maven project, add Spark dependencies, and include the Scala Maven plugin for compilation. Add the following to your pom.xml:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.1.2</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <!-- the Maven Scala plugin will compile Scala source files -->
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

Step 4: Write the Spark application

The following Scala code reads from Kafka and writes to an Iceberg table in streaming mode.

Important

Catalog parameters and default catalog names vary by cluster version. This example uses an EMR V5.3.0 cluster with DLF (Data Lake Formation) managing metadata under a catalog named dlf_catalog. For DLF configuration details, see Configuration of DLF metadata.

def main(args: Array[String]): Unit = {

  // Configure the Iceberg catalog backed by DLF.
  val sparkConf = new SparkConf()
  sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
  sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
  sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
  sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
  sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")

  val spark = SparkSession
    .builder()
    .config(sparkConf)
    .appName("StructuredSinkIceberg")
    .getOrCreate()

  val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
  val bootstrapServers = "192.168.XX.XX:9092"
  val topic = "iceberg_test"

  // Read from the Kafka topic on the Dataflow cluster.
  val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topic)
    .load()

  val resDF = df.selectExpr(
      "CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // Decode Base64-encoded key to string.
      "CAST(value AS STRING) AS data")
    .select(
      col("strKey").cast(LongType).alias("id"), // Cast string key to LONG type.
      col("data")
    )

  // Write to the Iceberg table every minute.
  val query = resDF.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", "dlf_catalog.iceberg_db.iceberg_table")
    .option("checkpointLocation", checkpointPath)
    .start()

  query.awaitTermination()
}

You can change the values of the parameters described in the following table based on your business requirements.

VariableDescription
checkpointPathOSS path for Spark Structured Streaming checkpoint data
bootstrapServersPrivate IP address of a Kafka broker in the Dataflow cluster
topicKafka topic name to subscribe to

Step 5: Package and deploy to EMR

  1. After testing locally, package the project:

    mvn clean install
  2. Log in to your EMR cluster in SSH mode. For more information, see Log on to a cluster.

  3. Upload the JAR package to the root directory of the EMR cluster.

Step 6: Submit the Spark job

Run spark-submit to start the streaming job:

spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 1g \
  --executor-cores 2 \
  --executor-memory 3g \
  --num-executors 1 \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \
  --class com.aliyun.iceberg.StructuredSinkIceberg \
  iceberg-demos.jar
Note
  • Replace <version> with the actual version. spark-sql-kafka must be compatible with both your Spark and Kafka versions.

  • This example uses a JAR package named iceberg-demos.jar. Adjust --class and the JAR name to match your project.

To verify that data is being written, use Spark SQL to query changes. For more information, see Basic usage.

What's next