This topic demonstrates how to delete, update, and merge data in Delta Lake.

Delete data

  • Scala
    import io.delta.tables._
    
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    
    deltaTable.delete("date < '2019-11-11'")
    
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    deltaTable.delete(col("date") < "2019-11-11")
  • SQL
    DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
    Note
    • If no conditions are set, all data is deleted. In this case, the DELETE FROM statement works in the same way as TRUNCATE TABLE.
    • Currently, a WHERE clause with a subquery is not supported. However, if the subquery is a scalar subquery expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true to support the subquery. Example:
      DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • If you want to delete the rows that match rows in another table, for example, DELETE FROM target WHERE target.col = ref.col ..., use the merge function.

Update data

  • Scala
    import io.delta.tables._
    
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    
    deltaTable.updateExpr(            // Use an SQL formatted string.
      "name = 'Robet'",
      Map("name" -> "'Robert'")
    
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    deltaTable.update(                // Use an SQL function and implicit conversion.
      col("name") === "Robet"),
      Map("name" -> lit("Robert"));
  • SQL
    UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
    Note
    • If no conditions are set, all data is updated.
    • Currently, a WHERE clause with a subquery is not supported. However, if the subquery is a scalar query expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true to support the subquery. Example:
      UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • If you want to update the rows that match rows in another table, for example, UPDATE target SET target.col = ref.col ... or WHERE target.col = ref.col ..., use the merge function.

Merge data

  • Scala
    import io.delta.tables._
    import org.apache.spark.sql.functions._
    
    val updatesDF = ...  // define the updates DataFrame[date, id, name]
    
    DeltaTable.forPath(spark, "/tmp/delta_table")
      .as("target")
      .merge(updatesDF.as("source"), "target.id = source.id")
      .whenMatched("target.name = 'should_update'")
      .updateExpr(Map("target.name" -> "source.name"))
      .whenMatched("target.name = 'should_delete'")
      .delete()
      .whenNotMatched("source.name = 'shoulde_insert'")
      .insertExpr(
        Map(
          "date" -> "updates.date",
          "eventId" -> "updates.eventId",
          "data" -> "updates.data"))
      .execute()
  • SQL
    MERGE INTO target AS t
    USING source AS s
    ON t.date = s.date
    WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
    WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
    WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
    Note
    • The UPDATE and INSERT clauses support using the asterisk (*) as a wildcard. If the clause is set to UPDATE SET * or INSERT *, all fields are updated or inserted.
    • Currently, an ON condition with a subquery is not supported. However, if the subquery is a scalar query expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true to support the subquery.