This topic demonstrates how to perform streaming reads and writes in Delta Lake.
Use a Delta table as a source
spark.readStream .format("delta") .option("maxFilesPerTrigger", 1000) .load("/tmp/delta_table")
Generally, after data is generated in a component that severs as a stream source, the data is consumed by downstream systems and remains unchanged in the stream source. However, Delta also functions as a data lake. Data may be deleted, updated, or merged in Delta Lake. Delta currently provides two options to deal with data changes.
- ignoreDeletes: When this option is set to true, all transactions that delete data at partition boundaries are ignored and will not affect streaming. Note that data deletion based on non-partition columns cannot be ignored based on this option.
- ignoreChanges: When this option is set to true, data deletes, updates, or merges are
not specially processed, but new files generated after these operations are sent downstream
as new data. For example, a file contains 10,000 records. After a record is updated,
the new file contains 9,999 old records and one new record. The 9,999 old records
and one new record are sent downstream.
maxFilesPerTriggerparameter specifies the maximum number of files to process in one batch. The default value is 1000.
- The ignoreDeletes option behaves as expected, but it is only applicable to partition deletion. The behavior of ignoreChanges may sound strange, which is mainly determined by how Delta works. When the ignoreChanges option is set to true, duplicated data may be sent downstream. Therefore, your downstream consumers must be able to handle duplicates.
Use a Delta table as a sink
- Append mode: This is the default mode of Spark Streaming.
df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta_table/_checkpoints) .start("/tmp/delta_table")
- Complete mode: In this mode, every batch will replace the entire destination table.
For example, a table has the id, date, name, and sales columns, which are of the LONG,
DATE, STRING, and DOUBLE types respectively. You can calculate the total sales of
each person, record the results in the table, and update the table with every batch.
spark.readStream .format("delta") .load("/tmp/delta_table") .select("name","sales") .groupBy("name") .agg(sum("sales")) .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints") .start("/tmp/delta_table_summary")