本文介绍Delta Lake流式读写数据的一些使用示例。
Delta Table 作为数据源(Source)
Scala
spark.readStream
.format("delta")
.option("maxFilesPerTrigger", 1000)
.load("/tmp/delta_table")
作为数据源的组件一般数据一旦产生就会被下游消费,数据不会发生变化。但是Delta还兼顾了数据湖的角色,数据可能会被删除、更新,或者合并。目前Delta提供了两个选项来应对这种情况:
- ignoreDeletes:设置该选项为true后,一切**对分区的删除**动作不会有任何影响。注意,非分区列的删除不包含在此。
- ignoreChanges:设置该选项为true后,删除、更新或合并动作不会被特殊处理,但是这些动作产生的新文件会被当成新数据发送到下游。例如,某一个文件包含10000条数据,更新其中一条数据后,新文件有9999条旧数据和1条新数据。这9999条旧数据和1条新数据会被发送到下游。
说明
maxFilesPerTrigger
指定了一个批次最多处理的文件数量,默认值为1000。- ignoreDeletes的行为比较符合预期,但仅仅局限于对分区的删除。ignoreChanges的行为可能有些奇怪,这主要是由于Delta的工作原理决定的。当设置ignoreChanges可能会有重复数据发送到下游,因此下游应当注意处理这些重复数据。
Delta Table 作为数据接收端(Sink)
- Append模式:该模式是Spark Streaming的默认工作模式。
Scala
df.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/tmp/delta_table/_checkpoints) .start("/tmp/delta_table")
- Complete模式:在该模式下每一次batch的执行都会以全表覆盖的形式写目标表。例如,对于 (id LONG, date DATE, name STRING,
sales DOUBLE)这张表,我们统计每个人的总销售额,将统计结果写入目标表,每个批次更新一次。
Scala
spark.readStream .format("delta") .load("/tmp/delta_table") .select("name","sales") .groupBy("name") .agg(sum("sales")) .writeStream .format("delta") .outputMode("complete") .option("checkpointLocation", "/tmp/delta_table_summary/_checkpoints") .start("/tmp/delta_table_summary")