This topic describes how to use Delta Lake to perform streaming reads and writes. You can use a Delta table as a stream source or sink.

Use a Delta table as a stream source

spark.readStream
  .format("delta")
  .option("maxFilesPerTrigger", 1000)
  .load("/tmp/delta_table")

The maxFilesPerTrigger parameter specifies the maximum number of files to process in a single batch. The default value is 1000.

In most cases, after data is generated in a component that serves as a stream source, the data is used by downstream systems and remains unchanged in the stream source. However, Delta Lake also functions as a data lake. Data may be deleted, updated, or merged in Delta Lake. Delta Lake provides two options to deal with data changes.
  • ignoreDeletes: If this option is set to true, all transactions that delete data at partition boundaries are ignored and will not affect streaming.
  • ignoreChanges: If this option is set to true, data deletes, updates, or merges are not specially processed, but new files generated from these operations are sent downstream as new data. For example, a file contains 10,000 records. After one of the records is updated, both the new record and the 9,999 old records are sent downstream.

Use a Delta table as a stream sink

  • Append mode: Add new records to the destination table. This is the default mode of Spark Streaming.
    df.writeStream
      .format("delta")
      .outputMode("append")
      .option("checkpointLocation", "/tmp/delta_table/_checkpoints")
      .start("/tmp/delta_table")
  • Complete mode: Replace the entire destination table with every batch. For example, a table has the id (LONG), date (DATE), name (STRING), and sales (DOUBLE ) columns. You can calculate the total sales of each person and write the results in the table. In this example, the table is updated in each batch.
    • Reads and writes by using Spark Structured Streaming
      spark.readStream
        .format("delta")
        .load("/tmp/delta_table")
        .select("name","sales")
        .groupBy("name")
        .agg(sum("sales"))
        .writeStream
        .format("delta")
        .outputMode("complete")
        .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints")
        .start("/tmp/delta_table_summary")
    • Reads and writes by using Streaming SQL
      create table targetTableName
              (key bigint, value bigint)
              using delta;
      
      create table sourceTableName
              (key bigint, value bigint)
              using delta;
      
      insert into sourceTableName values(1,238),(238,2388);
      
      CREATE SCAN stream_source_1 ON sourceTableName USING STREAM;
      
      CREATE STREAM test_delta_stream
              OPTIONS(
                checkpointLocation='/tmp/test_delta_stream/targetTableName'
              )
              MERGE INTO targetTableName AS target
              USING (
              select key, value
              from (
               SELECT key, value, row_number() over(partition by key order by value) as rn
               FROM stream_source_1 ) where rn = 1
              ) AS source
              ON target.key=source.value
              WHEN MATCHED THEN UPDATE SET *
              WHEN NOT MATCHED THEN INSERT *
       ;