Use Apache Spark Structured Streaming to continuously write data from Apache Kafka into an Apache Iceberg table on EMR. All writes go through the DataStreamWriter API.
Prerequisites
Before you begin, make sure you have:
A DataLake cluster or custom cluster. For more information, see Create a cluster.
A Dataflow cluster with the Kafka service enabled. For more information, see Create a cluster.
Both clusters deployed in the same vSwitch of the same virtual private cloud (VPC).
How it works
Submit a streaming write job by calling DataStreamWriter from Spark Structured Streaming:
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()tableIdentifier is either the name or the path of the target metadata table. Two output modes are supported:
| Mode | Behavior | SQL equivalent |
|---|---|---|
append | Writes each micro-batch as new rows | INSERT INTO |
complete | Replaces the entire table with the latest micro-batch | INSERT OVERWRITE |
Stream data from Kafka to an Iceberg table
This example reads data from a Dataflow cluster (Kafka) and writes it to an Iceberg table. After packaging the code, submit it as a Spark job using spark-submit.
Step 1: Set up a Kafka topic with test data
Log in to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
Create a topic named
iceberg_test:kafka-topics.sh --bootstrap-server core-1-1:9092,core-1-2:9092,core-1-3:9092 \ --topic iceberg_test \ --partitions 3 \ --replication-factor 2 \ --createProduce test data to the topic:
kafka-console-producer.sh --broker-list core-1-1:9092,core-1-2:9092,core-1-3:9092 \ --topic iceberg_test
Step 2: Create a database and table in Iceberg
Use Spark SQL to create a database named iceberg_db and a table named iceberg_table. For more information, see Use Iceberg.
Step 3: Set up the Maven project
Create a Maven project, add Spark dependencies, and include the Scala Maven plugin for compilation. Add the following to your pom.xml:
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- the Maven Scala plugin will compile Scala source files -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>Step 4: Write the Spark application
The following Scala code reads from Kafka and writes to an Iceberg table in streaming mode.
Catalog parameters and default catalog names vary by cluster version. This example uses an EMR V5.3.0 cluster with DLF (Data Lake Formation) managing metadata under a catalog named dlf_catalog. For DLF configuration details, see Configuration of DLF metadata.
def main(args: Array[String]): Unit = {
// Configure the Iceberg catalog backed by DLF.
val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.catalog-impl", "org.apache.iceberg.aliyun.dlf.DlfCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.io-impl", "org.apache.iceberg.hadoop.HadoopFileIO")
sparkConf.set("spark.sql.catalog.dlf_catalog.oss.endpoint", "<yourOSSEndpoint>")
sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.id", "<yourAccessKeyId>")
sparkConf.set("spark.sql.catalog.dlf_catalog.access.key.secret", "<yourAccessKeySecret>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.catalog-id", "<yourCatalogId>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.endpoint", "<yourDLFEndpoint>")
sparkConf.set("spark.sql.catalog.dlf_catalog.dlf.region-id", "<yourDLFRegionId>")
val spark = SparkSession
.builder()
.config(sparkConf)
.appName("StructuredSinkIceberg")
.getOrCreate()
val checkpointPath = "oss://mybucket/tmp/iceberg_table_checkpoint"
val bootstrapServers = "192.168.XX.XX:9092"
val topic = "iceberg_test"
// Read from the Kafka topic on the Dataflow cluster.
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.load()
val resDF = df.selectExpr(
"CAST(unbase64(CAST(key AS STRING)) AS STRING) AS strKey", // Decode Base64-encoded key to string.
"CAST(value AS STRING) AS data")
.select(
col("strKey").cast(LongType).alias("id"), // Cast string key to LONG type.
col("data")
)
// Write to the Iceberg table every minute.
val query = resDF.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", "dlf_catalog.iceberg_db.iceberg_table")
.option("checkpointLocation", checkpointPath)
.start()
query.awaitTermination()
}You can change the values of the parameters described in the following table based on your business requirements.
| Variable | Description |
|---|---|
checkpointPath | OSS path for Spark Structured Streaming checkpoint data |
bootstrapServers | Private IP address of a Kafka broker in the Dataflow cluster |
topic | Kafka topic name to subscribe to |
Step 5: Package and deploy to EMR
After testing locally, package the project:
mvn clean installLog in to your EMR cluster in SSH mode. For more information, see Log on to a cluster.
Upload the JAR package to the root directory of the EMR cluster.
Step 6: Submit the Spark job
Run spark-submit to start the streaming job:
spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 1g \
--executor-cores 2 \
--executor-memory 3g \
--num-executors 1 \
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:<version> \
--class com.aliyun.iceberg.StructuredSinkIceberg \
iceberg-demos.jarReplace
<version>with the actual version.spark-sql-kafkamust be compatible with both your Spark and Kafka versions.This example uses a JAR package named
iceberg-demos.jar. Adjust--classand the JAR name to match your project.
To verify that data is being written, use Spark SQL to query changes. For more information, see Basic usage.