Currently, most systems that support stream ingestion ingest streaming data by writing small batches of streaming data to the storage system as small files and periodically merging the small files. For example, both Hive and Delta Lake ingest streaming data in this way. Kudu also supports stream ingestion. However, Kudu has its own storage system and is not a solution based on a big data storage system.

Evolution of stream ingestion

Phase Description
Previously To ingest streaming data, you need to manually partition a fact table by time to a fine granularity. For example, you can create a partition every five minutes. After data is written to a partition, you need to execute the INSERT OVERWRITE statement to merge small files of the partition and overwrite the partition with the merged files. This method has the following drawbacks:
  • Reads and writes are not isolated, which easily cause read failures or data accuracy issues.
  • Streaming jobs cannot guarantee the exactly-once semantics. If a stream ingestion job fails, manual intervention is required to make sure that no data is duplicated or omitted. Spark Streaming can guarantee the at-least-once semantics, but not the exactly-once semantics.
Hive 0.13 and later support transactions. Hive 2.0 and later provide the Hive Streaming feature to support stream ingestion. However, Hive Streaming is never widely used due to the following causes:
  • Hive modified underlying files to implement the transaction feature. As a result, the common storage format can only be read by Hive. You cannot use other tools such as Spark SQL or Presto to read the storage format.
  • Currently, Hive transactions only support the Optimized Row Columnar (ORC) format.
  • Hive reads data in the merge-on-read mode. As a result, Hive needs to sort and merge small files during reading. When the number of small files increases, the read performance degrades sharply. Therefore, you must manually merge small files in a timely manner. In addition, small files often fail to be merged, which affects your business efficiency.
  • This read mode of Hive makes it unsuitable for a data lake. It can only be used to implement a data warehouse. This is because data sources and data requirements are diverse in a data lake.
Now With Delta, you can easily ingest streaming data with only four steps:
  1. Create a table.
  2. Start a Spark Streaming job to write a stream of data to the table.
  3. Optimize data periodically, for example, after data is written to a partition.
  4. Vacuum historical data periodically, for example, every day.

Example

In this example, data is read from Kafka and written to a Delta table. Prepare a Python script to continuously send data to Kafka.

#! /usr/bin/env python3

import json
import time

from kafka import KafkaProducer
from kafka.errors import KafkaError

bootstrap = ['emr-header-1:9092']
topic = 'delta_stream_sample'

def gnerator():
    id = 0
    line = {}
    while True:
        line['id'] = id
        line['date'] = '2019-11-11'
        line['name'] = 'Robert'
        line['sales'] = 123
        yield line
        id = id + 1

def sendToKafka():
    producer = KafkaProducer(bootstrap_servers=bootstrap)

    for line in gnerator():
        data = json.dumps(line).encode('utf-8')

        # Asynchronous by default
        future = producer.send(topic, data)

        # Block for 'synchronous' sends
        try:
            record_metadata = future.get(timeout=10)
        except KafkaError as e:
            # Decide what to do if produce request failed...
            pass
        time.sleep(0.1)

sendToKafka()

For convenience, all data records are the same except for the ID.

{"id": 0, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 1, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 2, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 3, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 4, "date": "2019-11-11", "name": "Robert", "sales": 123}
{"id": 5, "date": "2019-11-11", "name": "Robert", "sales": 123}

Start a Spark Streaming job to read data from Kafka and write the data to a Delta table.

  • Scala
    • Bash
       spark-shell --master local --use-emr-datasource
    • Scala
      import org.apache.spark.sql.{functions, SparkSession}
      import org.apache.spark.sql.types.DataTypes
      import org.apache.spark.sql.types.StructField
      
      val targetDir = "/tmp/delta_table"
      val checkpointLocation = "/tmp/delta_table_checkpoint"
      val bootstrapServers = "192.168.67.88:9092"
      val topic = "delta_stream_sample"
      
      val schema = DataTypes.createStructType(Array[StructField](
        DataTypes.createStructField("id", DataTypes.LongType, false),
        DataTypes.createStructField("date", DataTypes.DateType, false),
        DataTypes.createStructField("name", DataTypes.StringType, false),
        DataTypes.createStructField("sales", DataTypes.StringType, false)))
      
      val lines = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", bootstrapServers)
          .option("subscribe", topic)
          .option("maxOffsetsPerTrigger", 1000)
          .option("startingOffsets", "earliest")
          .option("failOnDataLoss", value = false)
          .load()
          .select(functions.from_json(functions.col("value").cast("string"), schema).as("json"))
          .select("json.*")
      
      val query = lines.writeStream
          .outputMode("append")
          .format("delta")
          .option("checkpointLocation", checkpointLocation)
          .start(targetDir)
      
      query.awaitTermination()
  • SQL
    • Bash
      streaming-sql --master local --use-emr-datasource
    • SQL
      CREATE TABLE IF NOT EXISTS kafka_table
      USING kafka
      OPTIONS(
      kafka.bootstrap.servers='192.168.67.88:9092',
      subscribe='delta_stream_sample'
      );
      
      CREATE TABLE IF NOT EXISTS delta_table (id LONG, `date` DATE, name STRING, sales STRING)
      USING delta
      LOCATION '/tmp/delta_table';
      
      CREATE SCAN stream_kafka_table on kafka_table USING STREAM
      OPTIONS(
      maxOffsetsPerTrigger='1000',
      startingOffsets='earliest',
      failOnDataLoss=false
      );
      
      CREATE STREAM job
      OPTIONS(
      checkpointLocation='/tmp/delta_table_checkpoint'
      )
      INSERT INTO delta_table
      SELECT
          content.id as id,
          content.date as date,
          content.name as name,
          content.sales as sales
      FROM (
          SELECT from_json(CAST(value as STRING), 'id LONG, `date` DATE, name STRING, sales STRING') as content
          FROM stream_kafka_table
      );

Start a Spark shell and read the Delta table to verify that the data has been written to the Delta table.

  • Scala
    val df = spark.read.format("delta").load("/tmp/delta_table")
    df.select("*").orderBy("id").show(10000)
  • SQL
    SELECT * FROM delta_table ORDER BY id LIMIT 10000;

According to the query result, 2285 data records have been written.

|2295|2019-11-11|Robert|  123|
|2296|2019-11-11|Robert|  123|
|2297|2019-11-11|Robert|  123|
|2275|2019-11-11|Robert|  123|
|2276|2019-11-11|Robert|  123|
|2277|2019-11-11|Robert|  123|
|2278|2019-11-11|Robert|  123|
|2279|2019-11-11|Robert|  123|
|2280|2019-11-11|Robert|  123|
|2281|2019-11-11|Robert|  123|
|2282|2019-11-11|Robert|  123|
|2283|2019-11-11|Robert|  123|
|2284|2019-11-11|Robert|  123|
|2285|2019-11-11|Robert|  123|
+----+----------+------+-----+

Test the exactly-once semantics

Stop the Spark Streaming job and start it again. Read the Delta table again. If the data continues from the last streaming interrupt, the data is correct.

  • Scala
    df.select("*").orderBy("id").show(10000)
  • SQL
    SELECT * FROM delta_table ORDER BY id LIMIT 10000;
    ...
    |2878|2019-11-11|Robert|  123|
    |2879|2019-11-11|Robert|  123|
    |2880|2019-11-11|Robert|  123|
    |2881|2019-11-11|Robert|  123|
    |2882|2019-11-11|Robert|  123|
    |2883|2019-11-11|Robert|  123|
    |2884|2019-11-11|Robert|  123|
    |2885|2019-11-11|Robert|  123|
    |2886|2019-11-11|Robert|  123|
    |2887|2019-11-11|Robert|  123|
    |2888|2019-11-11|Robert|  123|
    |2889|2019-11-11|Robert|  123|
    |2890|2019-11-11|Robert|  123|
    |2891|2019-11-11|Robert|  123|