This topic describes how to use Delta Lake to perform streaming reads and writes. You can use a Delta table as a stream source or sink.
Use a Delta table as a stream source
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", 1000)
.load("/tmp/delta_table")
The maxFilesPerTrigger
parameter specifies the maximum number of files to process in a single batch. The
default value is 1000.
In most cases, after data is generated in a component that serves as a stream source,
the data is used by downstream systems and remains unchanged in the stream source.
However, Delta Lake also functions as a data lake. Data may be deleted, updated, or
merged in Delta Lake. Delta Lake provides two options to deal with data changes.
- ignoreDeletes: If this option is set to true, all transactions that delete data at partition boundaries are ignored and will not affect streaming.
- ignoreChanges: If this option is set to true, data deletes, updates, or merges are not specially processed, but new files generated from these operations are sent downstream as new data. For example, a file contains 10,000 records. After one of the records is updated, both the new record and the 9,999 old records are sent downstream.
Use a Delta table as a stream sink
- Append mode: Add new records to the destination table. This is the default mode of
Spark Streaming.
df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta_table/_checkpoints") .start("/tmp/delta_table")
- Complete mode: Replace the entire destination table with every batch. For example,
a table has the id (LONG), date (DATE), name (STRING), and sales (DOUBLE ) columns.
You can calculate the total sales of each person and write the results in the table.
In this example, the table is updated in each batch.
- Reads and writes by using Spark Structured Streaming
spark.readStream .format("delta") .load("/tmp/delta_table") .select("name","sales") .groupBy("name") .agg(sum("sales")) .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints") .start("/tmp/delta_table_summary")
- Reads and writes by using Streaming SQL
create table targetTableName (key bigint, value bigint) using delta; create table sourceTableName (key bigint, value bigint) using delta; insert into sourceTableName values(1,238),(238,2388); CREATE SCAN stream_source_1 ON sourceTableName USING STREAM; CREATE STREAM test_delta_stream OPTIONS( checkpointLocation='/tmp/test_delta_stream/targetTableName' ) MERGE INTO targetTableName AS target USING ( select key, value from ( SELECT key, value, row_number() over(partition by key order by value) as rn FROM stream_source_1 ) where rn = 1 ) AS source ON target.key=source.value WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT * ;
- Reads and writes by using Spark Structured Streaming