In most cases, systems that support stream ingestion write streaming data in small batches to the storage system as small files and periodically merge the small files. For example, Hive and Delta Lake ingest streaming data in the preceding manner. Kudu also supports stream ingestion. However, Kudu has its own storage architecture and is not built on top of a big data storage system. This topic describes how to ingest streaming data from Kafka.
Prerequisites
A custom cluster or a DataLake cluster that contains the Delta Lake service is created in the E-MapReduce (EMR) console. For more information, see Create a cluster.
A Dataflow cluster that contains the Kafka service is created. For more information, see Create a cluster.
Limits
The DataLake cluster or custom cluster and the Dataflow cluster must be deployed in the same vSwitch of the same virtual private cloud (VPC).
Evolution of stream ingestion
Phase | Description |
Previously | To ingest streaming data, you must manually partition a fact table by time in a fine-grained manner. For example, you can create a partition every 5 minutes. After data is written to a partition, you must execute the INSERT OVERWRITE statement to merge small files in the partition into one or more large files and write the large files back to the partition. However, the following issues may occur:
Hive 0.13 and later support transactions. Hive 2.0 and later provide the Hive Streaming feature to support stream ingestion. However, Hive Streaming is not widely used due to the following causes:
|
Now | You can use Delta Lake to ingest streaming data in a convenient manner. Procedure:
|
Example
In this example, data is read from Kafka and written to a Delta table.
Log on to the Dataflow cluster in SSH mode. For more information, see Log on to a cluster.
Run the following commands to create a Kafka topic:
sudo su - kafka kafka-topics.sh --partitions 3 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic delta_stream_sample --createNotecore-1-1indicates the internal IP address or hostname of the broker node in the Dataflow cluster.Prepare a Python script to continuously send data to Kafka.
#! /usr/bin/env python3 import json import time from kafka import KafkaProducer from kafka.errors import KafkaError bootstrap = ['core-1-1:9092'] topic = 'delta_stream_sample' def gnerator(): id = 0 line = {} while True: line['id'] = id line['date'] = '2019-11-11' line['name'] = 'Robert' line['sales'] = 123 yield line id = id + 1 def sendToKafka(): producer = KafkaProducer(bootstrap_servers=bootstrap) for line in gnerator(): data = json.dumps(line).encode('utf-8') # Asynchronous by default future = producer.send(topic, data) # Block for 'synchronous' sends try: record_metadata = future.get(timeout=10) except KafkaError as e: # Decide what to do if produce request failed pass time.sleep(0.1) sendToKafka()For convenience, all data records are the same except for the
ID.{"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123} {"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}Start a Spark Streaming job to read data from Kafka and write the data to a Delta table.
Write Spark code.
Sample code in Scala:
import org.apache.spark.SparkConf import org.apache.spark.sql.{SparkSession, functions} import org.apache.spark.sql.types.{DataTypes, StructField} object StreamToDelta { def main(args: Array[String]): Unit = { val targetDir = "/tmp/delta_table" val checkpointLocation = "/tmp/delta_table_checkpoint" // 192.168.XX.XX is the internal IP address of Kafka. val bootstrapServers = "192.168.XX.XX:9092" val topic = "delta_stream_sample" val schema = DataTypes.createStructType(Array[StructField]( DataTypes.createStructField("id", DataTypes.LongType, false), DataTypes.createStructField("date", DataTypes.DateType, false), DataTypes.createStructField("name", DataTypes.StringType, false), DataTypes.createStructField("sales", DataTypes.StringType, false))) val sparkConf = new SparkConf() // StreamToDelta is the class name of Scala. val spark = SparkSession .builder() .config(sparkConf) .appName("StreamToDelta") .getOrCreate() val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .option("maxOffsetsPerTrigger", 1000) .option("startingOffsets", "earliest") .option("failOnDataLoss", value = false) .load() .select(functions.from_json(functions.col("value").cast("string"), schema).as("json")) .select("json.*") val query = lines.writeStream .outputMode("append") .format("delta") .option("checkpointLocation", checkpointLocation) .start(targetDir) query.awaitTermination() } }Package the code, and deploy the code to the DataLake cluster.
After you debug the code on your on-premises machine, run the following command to package the code:
mvn clean installLog on to the DataLake cluster in SSH mode. For more information, see Log on to a cluster.
Upload the JAR package to the DataLake cluster.
In this example, the JAR package is uploaded to the root directory of the DataLake cluster.
Submit and run a Spark Streaming job.
Run the spark-submit command to submit and run the Spark Streaming job:
spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \ --class com.aliyun.delta.StreamToDelta \ delta-demo-1.0.jarNoteIn this example, a JAR package named delta-demo-1.0.jar is used. You can change the value of the --class parameter and the name of the JAR package based on your business requirements.
Start the spark-shell client and check whether data is written to the Delta table.
Scala
Run the following command to go to the spark-shell client:
spark-shell --master localExecute the following Scala statement to query data:
val df = spark.read.format("delta").load("/tmp/delta_table") df.select("*").orderBy("id").show(10000)
SQL
Run the following command to go to the streaming-sql client:
streaming-sql --master localExecute the following SQL statement to query data:
SELECT * FROM delta_table ORDER BY id LIMIT 10000;The result indicates that 2,285 data records are written to the Delta table.
|2295|2019-11-11|Robert| 123| |2296|2019-11-11|Robert| 123| |2297|2019-11-11|Robert| 123| |2275|2019-11-11|Robert| 123| |2276|2019-11-11|Robert| 123| |2277|2019-11-11|Robert| 123| |2278|2019-11-11|Robert| 123| |2279|2019-11-11|Robert| 123| |2280|2019-11-11|Robert| 123| |2281|2019-11-11|Robert| 123| |2282|2019-11-11|Robert| 123| |2283|2019-11-11|Robert| 123| |2284|2019-11-11|Robert| 123| |2285|2019-11-11|Robert| 123| +----+----------+------+-----+
Test the exactly-once semantics
Stop the Spark Streaming job and restart the job. Read the Delta table. If data is read from the position where data reading stops previously, exactly-once semantics is achieved.
Scala
df.select("*").orderBy("id").show(10000)SQL
SELECT * FROM delta_table ORDER BY id LIMIT 10000;|2878|2019-11-11|Robert| 123| |2879|2019-11-11|Robert| 123| |2880|2019-11-11|Robert| 123| |2881|2019-11-11|Robert| 123| |2882|2019-11-11|Robert| 123| |2883|2019-11-11|Robert| 123| |2884|2019-11-11|Robert| 123| |2885|2019-11-11|Robert| 123| |2886|2019-11-11|Robert| 123| |2887|2019-11-11|Robert| 123| |2888|2019-11-11|Robert| 123| |2889|2019-11-11|Robert| 123| |2890|2019-11-11|Robert| 123| |2891|2019-11-11|Robert| 123|