This topic describes how to use E-MapReduce (EMR) Delta Lake to delete, update, and merge data, and perform other data management operations.

Background information

The following table describes the data management operations that are supported by EMR Delta Lake.
Operation Description
Delete data Deletes data.
Update data Updates data.
Merge data Merges data.
DESCRIBE HISTORY

Displays the details about the operation history of a Delta table.

CONVERT

Converts a Parquet table to a Delta table.

OPTIMIZE Optimizes the data layout of a Delta table by merging small files or implementing Z-ordering. This command improves query efficiency.
VACUUM

Deletes data files that are no longer required from the table path and have been stored for a period of time that exceeds the retention period.

SAVEPOINT

Permanently retains the historical versions of a Delta table.

ROLLBACK

Rolls back a Delta table to a specified historical version.

Delete data

  • Scala
    import io.delta.tables.
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    deltaTable.delete("date < '2019-11-11'")
    import org.apache.spark.sql.functions.
    import spark.implicits.
    deltaTable.delete(col("date") < "2019-11-11")
  • SQL
    DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
    • A WHERE clause that has a subquery is not supported. However, if the subquery is a scalar subquery expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true. This way, you can use the WHERE clause that has the subquery. Example:
      DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • If you want to delete the rows that match rows in another table, use the MERGE command. For example, use the MERGE command to replace the DELETE FROM target WHERE target.col = ref.col ... command.
    Note If you use the DELETE command and do not specify conditions, all data is deleted.

Update data

  • Scala
    import io.delta.tables._
    
    val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
    
    deltaTable.updateExpr(            //Use an SQL-formatted string. 
      "name = 'Robet'",
      Map("name" -> "'Robert'")
    
    import org.apache.spark.sql.functions._
    import spark.implicits._
    
    deltaTable.update(                //Use an SQL function and perform an implicit conversion. 
      col("name") === "Robet"),
      Map("name" -> lit("Robert"));
  • SQL
    UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
    • A WHERE clause that has a subquery is not supported. However, if the subquery is a scalar subquery expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true. This way, you can use the WHERE clause that has the subquery. Example:
      UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
    • If you want to update the rows that match rows in another table, use the MERGE command. For example, use the MERGE command to replace the UPDATE target SET target.col = ref.col ... command or the WHERE target.col = ref.col ... clause.

Merge data

  • Scala
    import io.delta.tables._
    import org.apache.spark.sql.functions._
    
    val updatesDF = ...  // define the updates DataFrame[date, id, name]
    
    DeltaTable.forPath(spark, "/tmp/delta_table")
      .as("target")
      .merge(updatesDF.as("source"), "target.id = source.id")
      .whenMatched("target.name = 'should_update'")
      .updateExpr(Map("target.name" -> "source.name"))
      .whenMatched("target.name = 'should_delete'")
      .delete()
      .whenNotMatched("source.name = 'shoulde_insert'")
      .insertExpr(
        Map(
          "date" -> "updates.date",
          "eventId" -> "updates.eventId",
          "data" -> "updates.data"))
      .execute()
  • SQL
    MERGE INTO target AS t
    USING source AS s
    ON t.date = s.date
    WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
    WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
    WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
    • You can use an asterisk (*) as a wildcard in the UPDATE clause or the INSERT clause. If the clause is UPDATE SET * or INSERT *, all fields are updated or inserted.
    • An ON condition that has a subquery is not supported. If the subquery is a scalar subquery expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true. This way, you can use the ON condition that has the subquery.

DESCRIBE HISTORY

Displays the details about the operation history of a Delta table.

This command displays the following information in sequence: the version number, operation time, user ID, username, operation type, operation parameters, job information, notebook information, cluster ID, version of the table based on which the current operation is performed, isolation level, whether to directly append data, and operational metrics.
Note The values of most of the preceding fields are displayed as null.
Examples:
  • Display all operation records:
    DESC HISTORY dbName.tableName;
  • Display the latest operation record:
    DESC HISTORY dbName.tableName limit 1;

CONVERT

Converts a Parquet table to a Delta table.

The CONVERT command traverses all Parquet data files in a specified path, infers the schema of the current Parquet table, and generates the metadata information that is required by a Delta table. If the Parquet table is a partitioned table, you must specify partition fields and the data types of the partition fields.

Examples:
  • Convert the Parquet data files in the specified path to a Delta table.
    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
  • Convert the Parquet data files in the specified path to a Delta table and configure the dt and hour fields as the partition fields.
    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);
After you run a CONVERT command, the table path is converted to a format that is required by a Delta table. No Delta table is created. You must run the CREATE TABLE command to create a Delta table. Do not specify table fields and partition fields in the command. Example:
CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";

OPTIMIZE

Optimizes the data layout of a Delta table by merging small files or implementing Z-ordering. This command improves query efficiency. You can use the OPTIMIZE command to perform the following operations:
  • Optimize one or more specified partitions in a partitioned table.
  • Implement Z-ordering on specified non-partition fields of a table to optimize the data layout of the table during compact optimization.
Examples:
  • Optimize an entire table.
    OPTIMIZE dbName.tableName;
  • Optimize the partitions in which the value of the date field is earlier than 2021-04-01.
    OPTIMIZE dbName.tableName WHERE date < '2021-04-01';
  • Implement Z-ordering on the col2 and col3 columns for the partitions in which the value of the date field is earlier than 2021-04-01.
    OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
Note
  • In most cases, the amount of each batch of data is small if you ingest streaming data to Delta Lake. As a result, a large number of small files are generated. You can periodically run the OPTIMIZE command to merge small files.
  • If the query mode is fixed, you can implement Z-ordering to optimize the data layout. For example, if some non-partition fields are specified as the query conditions, you can implement Z-ordering.

VACUUM

Deletes data files that are no longer required from the table path and have been stored for a period of time that exceeds the retention period.

In EMR, the following Delta Lake definition data files are not required:
  • The data files that are associated with the latest version of a Delta table.
  • The data files that are associated with a specific version for which the SAVEPOINT command is run.
You can use one of the following methods to specify the retention period of data files:
  • Configure the table attribute parameter delta.deletedFileRetentionDuration. The default retention period is one week.
  • Specify a retention period in hours in the VACUUM command.
  • Syntax
    VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
  • Examples:
    • Delete the data files that are retained for more than seven days.
      VACUUM dbName.tableName;
    • Delete the data files that are retained for more than 24 hours.
      VACUUM dbName.tableName RETAIN 24 HOURS; 
    • Display the data files that are retained for more than 24 hours.
      VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN;
      Note
      • You can periodically run the VACUUM command to delete data files to release more storage space.
      • Before you delete data files, you can run the VACUUM command that ends with DRY RUN to view the data files that will be deleted.

SAVEPOINT

Permanently retains the historical versions of a Delta table.

Each time Delta Lake performs a checkpoint operation on a Delta table, the metadata files of logs are deleted. The delta.checkpointInterval parameter is used to specify the interval between checkpoint operations. The delta.logRetentionDuration parameter is used to specify the retention period of the metadata files of logs. By default, the metadata files are retained for 30 days. If you run the VACUUM command, the data files that are no longer required in historical versions are also deleted. You can run the SAVEPOINT command to permanently retain data files and the metadata files of logs. You can use the SAVEPOINT command together with the time travel feature to read the data of historical versions.

Examples:
  • Retain the version whose ID is 0:
    CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • Retain the latest version that is released before a specified point in time:
    CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
Delete or view the record of a SAVEPOINT operation:
  • Delete the record of a SAVEPOINT operation:
    DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
  • View the record of a SAVEPOINT operation:

    The following command returns the version number, version submission time, SAVEPOINT operation time, and other information about a SAVEPOINT operation:

    SHOW SAVEPOINT dbName.tableName;

ROLLBACK

Rolls back a Delta table to a specified historical version.

If the specified historical version cannot be restored because the required data files or the metadata files of logs are lost, an error is reported.

  • Roll back a Delta table to the version whose ID is 0:
    ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • Roll back a Delta table to the latest version that is released before a specified point in time:
    ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";