This topic describes how to use E-MapReduce (EMR) Delta Lake to delete, update, and merge data, and perform other data management operations.
Background information
Operation | Description |
---|---|
Delete data | Deletes data. |
Update data | Updates data. |
Merge data | Merges data. |
DESCRIBE HISTORY |
Displays the details about the operation history of a Delta table. |
CONVERT |
Converts a Parquet table to a Delta table. |
OPTIMIZE | Optimizes the data layout of a Delta table by merging small files or implementing Z-ordering. This command improves query efficiency. |
VACUUM |
Deletes data files that are no longer required from the table path and have been stored for a period of time that exceeds the retention period. |
SAVEPOINT |
Permanently retains the historical versions of a Delta table. |
ROLLBACK |
Rolls back a Delta table to a specified historical version. |
Delete data
- Scala
import io.delta.tables. val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table") deltaTable.delete("date < '2019-11-11'") import org.apache.spark.sql.functions. import spark.implicits. deltaTable.delete(col("date") < "2019-11-11")
- SQL
DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
- A WHERE clause that has a subquery is not supported. However, if the subquery is a
scalar subquery expressed in an SQL statement, you can set the
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
parameter totrue
. This way, you can use the WHERE clause that has the subquery. Example:DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
- If you want to delete the rows that match rows in another table, use the MERGE command.
For example, use the MERGE command to replace the
DELETE FROM target WHERE target.col = ref.col ...
command.
Note If you use theDELETE
command and do not specify conditions, all data is deleted. - A WHERE clause that has a subquery is not supported. However, if the subquery is a
scalar subquery expressed in an SQL statement, you can set the
Update data
- Scala
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table") deltaTable.updateExpr( //Use an SQL-formatted string. "name = 'Robet'", Map("name" -> "'Robert'") import org.apache.spark.sql.functions._ import spark.implicits._ deltaTable.update( //Use an SQL function and perform an implicit conversion. col("name") === "Robet"), Map("name" -> lit("Robert"));
- SQL
UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
- A WHERE clause that has a subquery is not supported. However, if the subquery is a
scalar subquery expressed in an SQL statement, you can set the
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
parameter totrue
. This way, you can use the WHERE clause that has the subquery. Example:UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
- If you want to update the rows that match rows in another table, use the MERGE command.
For example, use the MERGE command to replace the
UPDATE target SET target.col = ref.col ...
command or theWHERE target.col = ref.col ...
clause.
- A WHERE clause that has a subquery is not supported. However, if the subquery is a
scalar subquery expressed in an SQL statement, you can set the
Merge data
- Scala
import io.delta.tables._ import org.apache.spark.sql.functions._ val updatesDF = ... // define the updates DataFrame[date, id, name] DeltaTable.forPath(spark, "/tmp/delta_table") .as("target") .merge(updatesDF.as("source"), "target.id = source.id") .whenMatched("target.name = 'should_update'") .updateExpr(Map("target.name" -> "source.name")) .whenMatched("target.name = 'should_delete'") .delete() .whenNotMatched("source.name = 'shoulde_insert'") .insertExpr( Map( "date" -> "updates.date", "eventId" -> "updates.eventId", "data" -> "updates.data")) .execute()
- SQL
MERGE INTO target AS t USING source AS s ON t.date = s.date WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
- You can use an asterisk (
*
) as a wildcard in the UPDATE clause or the INSERT clause. If the clause isUPDATE SET *
orINSERT *
, all fields are updated or inserted. - An ON condition that has a subquery is not supported. If the subquery is a scalar
subquery expressed in an SQL statement, you can set the
spark.sql.uncorrelated.scalar.subquery.preexecution.enabled
parameter totrue
. This way, you can use the ON condition that has the subquery.
- You can use an asterisk (
DESCRIBE HISTORY
Displays the details about the operation history of a Delta table.
- Display all operation records:
DESC HISTORY dbName.tableName;
- Display the latest operation record:
DESC HISTORY dbName.tableName limit 1;
CONVERT
Converts a Parquet table to a Delta table.
The CONVERT command traverses all Parquet data files in a specified path, infers the schema of the current Parquet table, and generates the metadata information that is required by a Delta table. If the Parquet table is a partitioned table, you must specify partition fields and the data types of the partition fields.
- Convert the Parquet data files in the specified path to a Delta table.
CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
- Convert the Parquet data files in the specified path to a Delta table and configure
the dt and hour fields as the partition fields.
CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);
CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";
OPTIMIZE
- Optimize one or more specified partitions in a partitioned table.
- Implement Z-ordering on specified non-partition fields of a table to optimize the data layout of the table during compact optimization.
- Optimize an entire table.
OPTIMIZE dbName.tableName;
- Optimize the partitions in which the value of the date field is earlier than 2021-04-01.
OPTIMIZE dbName.tableName WHERE date < '2021-04-01';
- Implement Z-ordering on the col2 and col3 columns for the partitions in which the
value of the date field is earlier than 2021-04-01.
OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
- In most cases, the amount of each batch of data is small if you ingest streaming data to Delta Lake. As a result, a large number of small files are generated. You can periodically run the OPTIMIZE command to merge small files.
- If the query mode is fixed, you can implement Z-ordering to optimize the data layout. For example, if some non-partition fields are specified as the query conditions, you can implement Z-ordering.
VACUUM
Deletes data files that are no longer required from the table path and have been stored for a period of time that exceeds the retention period.
- The data files that are associated with the latest version of a Delta table.
- The data files that are associated with a specific version for which the SAVEPOINT command is run.
- Configure the table attribute parameter
delta.deletedFileRetentionDuration
. The default retention period is one week. - Specify a retention period in hours in the VACUUM command.
- Syntax
VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
- Examples:
- Delete the data files that are retained for more than seven days.
VACUUM dbName.tableName;
- Delete the data files that are retained for more than 24 hours.
VACUUM dbName.tableName RETAIN 24 HOURS;
- Display the data files that are retained for more than 24 hours.
VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN;
Note- You can periodically run the VACUUM command to delete data files to release more storage space.
- Before you delete data files, you can run the VACUUM command that ends with
DRY RUN
to view the data files that will be deleted.
- Delete the data files that are retained for more than seven days.
SAVEPOINT
Permanently retains the historical versions of a Delta table.
Each time Delta Lake performs a checkpoint operation on a Delta table, the metadata
files of logs are deleted. The delta.checkpointInterval
parameter is used to specify the interval between checkpoint operations. The delta.logRetentionDuration
parameter is used to specify the retention period of the metadata files of logs.
By default, the metadata files are retained for 30 days. If you run the VACUUM command,
the data files that are no longer required in historical versions are also deleted.
You can run the SAVEPOINT command to permanently retain data files and the metadata
files of logs. You can use the SAVEPOINT command together with the time travel feature
to read the data of historical versions.
- Retain the version whose ID is 0:
CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
- Retain the latest version that is released before a specified point in time:
CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
- Delete the record of a SAVEPOINT operation:
DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
- View the record of a SAVEPOINT operation:
The following command returns the version number, version submission time, SAVEPOINT operation time, and other information about a SAVEPOINT operation:
SHOW SAVEPOINT dbName.tableName;
ROLLBACK
Rolls back a Delta table to a specified historical version.
If the specified historical version cannot be restored because the required data files or the metadata files of logs are lost, an error is reported.
- Roll back a Delta table to the version whose ID is 0:
ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
- Roll back a Delta table to the latest version that is released before a specified
point in time:
ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";