Currently, most systems that support stream ingestion ingest streaming data by writing small batches of streaming data to the storage system as small files and periodically merging the small files. For example, both Hive and Delta Lake ingest streaming data in this way. Kudu also supports stream ingestion. However, Kudu has its own storage system and is not a solution based on a big data storage system.
Evolution of stream ingestion
Phase | Description |
---|---|
Previously | To ingest streaming data, you need to manually partition a fact table by time to a fine granularity. For example, you can create a partition every five minutes. After data is written to a partition, you need to execute the INSERT OVERWRITE statement to merge small files of the partition and overwrite the partition with the merged files. This method has the following drawbacks:
Hive 0.13 and later support transactions. Hive 2.0 and later provide the Hive Streaming feature to support stream ingestion. However, Hive Streaming is never widely used due to the following causes:
|
Now | With Delta, you can easily ingest streaming data with only four steps:
|
Example
In this example, data is read from Kafka and written to a Delta table. Prepare a Python script to continuously send data to Kafka.
#! /usr/bin/env python3
import json
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError
bootstrap = ['emr-header-1:9092']
topic = 'delta_stream_sample'
def gnerator():
id = 0
line = {}
while True:
line['id'] = id
line['date'] = '2019-11-11'
line['name'] = 'Robert'
line['sales'] = 123
yield line
id = id + 1
def sendToKafka():
producer = KafkaProducer(bootstrap_servers=bootstrap)
for line in gnerator():
data = json.dumps(line).encode('utf-8')
# Asynchronous by default
future = producer.send(topic, data)
# Block for 'synchronous' sends
try:
record_metadata = future.get(timeout=10)
except KafkaError as e:
# Decide what to do if produce request failed...
pass
time.sleep(0.1)
sendToKafka()
For convenience, all data records are the same except for the ID.
{"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}
Start a Spark Streaming job to read data from Kafka and write the data to a Delta table.
- Scala
- Bash
spark-shell --master local --use-emr-datasource
- Scala
import org.apache.spark.sql.{functions, SparkSession} import org.apache.spark.sql.types.DataTypes import org.apache.spark.sql.types.StructField val targetDir = "/tmp/delta_table" val checkpointLocation = "/tmp/delta_table_checkpoint" val bootstrapServers = "192.168.67.88:9092" val topic = "delta_stream_sample" val schema = DataTypes.createStructType(Array[StructField]( DataTypes.createStructField("id", DataTypes.LongType, false), DataTypes.createStructField("date", DataTypes.DateType, false), DataTypes.createStructField("name", DataTypes.StringType, false), DataTypes.createStructField("sales", DataTypes.StringType, false))) val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option("subscribe", topic) .option("maxOffsetsPerTrigger", 1000) .option("startingOffsets", "earliest") .option("failOnDataLoss", value = false) .load() .select(functions.from_json(functions.col("value").cast("string"), schema).as("json")) .select("json.*") val query = lines.writeStream .outputMode("append") .format("delta") .option("checkpointLocation", checkpointLocation) .start(targetDir) query.awaitTermination()
- Bash
- SQL
- Bash
streaming-sql --master local --use-emr-datasource
- SQL
CREATE TABLE IF NOT EXISTS kafka_table USING kafka OPTIONS( kafka.bootstrap.servers='192.168.67.88:9092', subscribe='delta_stream_sample' ); CREATE TABLE IF NOT EXISTS delta_table (id LONG, `date` DATE, name STRING, sales STRING) USING delta LOCATION '/tmp/delta_table'; CREATE SCAN stream_kafka_table on kafka_table USING STREAM OPTIONS( maxOffsetsPerTrigger='1000', startingOffsets='earliest', failOnDataLoss=false ); CREATE STREAM job OPTIONS( checkpointLocation='/tmp/delta_table_checkpoint' ) INSERT INTO delta_table SELECT content.id as id, content.date as date, content.name as name, content.sales as sales FROM ( SELECT from_json(CAST(value as STRING), 'id LONG, `date` DATE, name STRING, sales STRING') as content FROM stream_kafka_table );
- Bash
Start a Spark shell and read the Delta table to verify that the data has been written to the Delta table.
- Scala
val df = spark.read.format("delta").load("/tmp/delta_table") df.select("*").orderBy("id").show(10000)
- SQL
SELECT * FROM delta_table ORDER BY id LIMIT 10000;
According to the query result, 2285 data records have been written.
|2295|2019-11-11|Robert| 123|
|2296|2019-11-11|Robert| 123|
|2297|2019-11-11|Robert| 123|
|2275|2019-11-11|Robert| 123|
|2276|2019-11-11|Robert| 123|
|2277|2019-11-11|Robert| 123|
|2278|2019-11-11|Robert| 123|
|2279|2019-11-11|Robert| 123|
|2280|2019-11-11|Robert| 123|
|2281|2019-11-11|Robert| 123|
|2282|2019-11-11|Robert| 123|
|2283|2019-11-11|Robert| 123|
|2284|2019-11-11|Robert| 123|
|2285|2019-11-11|Robert| 123|
+----+----------+------+-----+
Test the exactly-once semantics
Stop the Spark Streaming job and start it again. Read the Delta table again. If the data continues from the last streaming interrupt, the data is correct.
- Scala
df.select("*").orderBy("id").show(10000)
- SQL
SELECT * FROM delta_table ORDER BY id LIMIT 10000; ... |2878|2019-11-11|Robert| 123| |2879|2019-11-11|Robert| 123| |2880|2019-11-11|Robert| 123| |2881|2019-11-11|Robert| 123| |2882|2019-11-11|Robert| 123| |2883|2019-11-11|Robert| 123| |2884|2019-11-11|Robert| 123| |2885|2019-11-11|Robert| 123| |2886|2019-11-11|Robert| 123| |2887|2019-11-11|Robert| 123| |2888|2019-11-11|Robert| 123| |2889|2019-11-11|Robert| 123| |2890|2019-11-11|Robert| 123| |2891|2019-11-11|Robert| 123|