This topic describes how to write data to an Iceberg table by using Spark Structured Streaming.

Prerequisites

  • A Hadoop cluster is created in the EMR console. For more information, see Create a cluster.
    Note This topic applies only to a Hadoop cluster that is of EMR V3.38.0, EMR V5.4.0, or a minor version later than EMR V3.38.0 or EMR V5.4.0.
  • A Kafka cluster is created in the EMR console. For more information, see Create a cluster.

Limits

The Hadoop and Kafka clusters must be deployed in the same vSwitch of the same virtual private cloud (VPC).

Write data to an Iceberg table in streaming mode

Write data to an Iceberg table by calling the DataStreamWriter API in Spark Structured Streaming.
val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()
Note The tableIdentifier parameter in the code specifies the name of a metadata table or the path of a metadata table. You can use one of the following methods to write data to an Iceberg table in streaming mode:
  • append: Append data in each batch to an Iceberg table. The method is equivalent to the INSERT INTO operation.
  • complete: Overwrite data in an Iceberg table with data in the latest batch. The method is equivalent to the INSERT OVERWRITE operation.

Example

This section provides an example on how to read data from the Kafka cluster and write the data to an Iceberg table. You can run the spark-submit command to run a Spark job to implement data read and write after you package the related code and upload the packaged code to your EMR cluster.

  1. Use a Kafka script to create a topic for a test and prepare test data.
    1. Log on to the Kafka cluster in SSH mode. For more information, see Log on to a cluster.
    2. Run the following command to create a topic named iceberg_test:
      kafka-topics.sh --zookeeper emr-header-1:2181,emr-worker-1:2181,emr-worker-2:2181 --topic iceberg_test --partitions 3 --replication-factor 2 --create
    3. Run the following command to prepare test data:
      kafka-console-producer.sh --broker-list emr-header-1:9092,emr-worker-1:9092,emr-worker-2:9092 --topic iceberg_test
  2. Use Spark SQL to create a database named iceberg_db and a table named iceberg_table for the test. For more information, see Use Iceberg.
  3. Write Spark code.
    Sample code in Scala:
    Notice The parameters and the default name of the catalog vary based on the version of your cluster. In this example, DLF is used to manage metadata. In this example, an EMR V5.3.0 cluster and a catalog named dlf_catalog are used. For more information, see Configuration of DLF metadata.
    def main(args: Array[String]): Unit = {
    
      // Configure the parameters for the catalog. 
      val sparkConf = new SparkConf()
      sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
      sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
      sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
      sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
      sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
    
      val spark = SparkSession
        .builder()
        .config(sparkConf)
        .appName("StructuredSinkIceberg")
        .getOrCreate()
    
      val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
      val bootstrapServers = "192.168.XX.XX:9092"
      val topic = "iceberg_test"
    
      // Read data from the Kafka cluster.
      val df = spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", bootstrapServers)
        .option("subscribe", topic)
        .load()
    
      import spark.implicits._
      val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .as[(String, String)].toDF("id", "data")
    
      // Write data to the Iceberg table in streaming mode.
      val query = resDF.writeStream
        .format("iceberg")
        .outputMode("append")
        .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
        .option("path", "dlf_catalog.iceberg_db.iceberg_table")
        .option("checkpointLocation", checkpointPath)
        .start()
    
      query.awaitTermination()
    }
    You can change the values of the parameters described in the following table based on your business requirements.
    Parameter Description
    checkpointPath The checkpoint path of the data that is written by using Spark Structured Streaming.
    bootstrapServers The private IP address of a Kafka broker in the Kafka cluster.
    topic The name of the topic.
  4. Package the code, and deploy the code to the EMR cluster.
    1. Check the Maven plug-ins that are used to compile the code in Scala. You can configure the following plug-in in the pom.xml file:
      <build>
          <plugins>
              <!-- the Maven Scala plugin will compile Scala source files -->
              <plugin>
                  <groupId>net.alchim31.maven</groupId>
                  <artifactId>scala-maven-plugin</artifactId>
                  <version>3.2.2</version>
                  <executions>
                      <execution>
                          <goals>
                              <goal>compile</goal>
                              <goal>testCompile</goal>
                          </goals>
                      </execution>
                  </executions>
              </plugin>
          </plugins>
      </build>
    2. After you debug the code on your on-premises machine, run the following command to package the code:
      mvn clean install
    3. Log on to your EMR cluster in SSH mode. For more information, see Log on to a cluster.
    4. Upload the JAR package to the EMR cluster.
      In this example, the JAR package is uploaded to the root directory of the EMR cluster.
  5. Submit and run a Spark job.
    1. Run the spark-submit command to run the Spark job:
      spark-submit \
       --master yarn \
       --deploy-mode cluster \
       --driver-memory 1g \
       --executor-cores 2 \
       --executor-memory 3g \
       --num-executors 1 \
       --class com.aliyun.iceberg.StructuredSinkIceberg \
       iceberg-demos.jar
      Note In this example, a JAR package named iceberg-demos.jar is used. You can change the value of the --class parameter and the name of the JAR package based on your business requirements.
    2. Use Spark SQL to query data changes. For more information, see Use Iceberg.