All Products
Search
Document Center

E-MapReduce:Data management

Last Updated:Dec 22, 2023

Alibaba Cloud E-MapReduce (EMR) Delta Lake provides powerful data processing capabilities. This helps you manage data and ensure data quality and consistency. This topic describes how to use EMR Delta Lake to delete, update, and merge data, and perform other data management operations.

DELETE

Deletes data. Sample code:

DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
import io.delta.tables.
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.delete("date < '2019-11-11'")
import org.apache.spark.sql.functions.
import spark.implicits.
deltaTable.delete(col("date") < "2019-11-11")
Note

If you do not specify conditions in the DELETE statement, all data is deleted.

  • A WHERE clause that has a subquery is not supported. However, if the subquery is a scalar subquery that is expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true. This way, you can use the WHERE clause that has the subquery. Example:

    DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • If you want to delete rows that match rows in another table, use the MERGE statement. For example, use the MERGE statement to replace the DELETE FROM target WHERE target.col = ref.col ... statement.

UPDATE

Updates data. Sample code:

UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")

deltaTable.updateExpr(            //Use an SQL-formatted string. 
  "name = 'Robet'",
  Map("name" -> "'Robert'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                //Use an SQL function and perform an implicit conversion. 
  col("name") === "Robet"),
  Map("name" -> lit("Robert"));
  • A WHERE clause that has a subquery is not supported. However, if the subquery is a scalar subquery that is expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true. This way, you can use the WHERE clause that has the subquery. Example:

    UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • If you want to update rows that match rows in another table, use the MERGE statement. For example, use the MERGE statement to replace the UPDATE target SET target.col = ref.col ... or UPDATE target WHERE target.col = ref.col ... statement.

MERGE

Merges data. Sample code:

MERGE INTO target AS t
USING source AS s
ON t.date = s.date
WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, id, name]

DeltaTable.forPath(spark, "/tmp/delta_table")
  .as("target")
  .merge(updatesDF.as("source"), "target.id = source.id")
  .whenMatched("target.name = 'should_update'")
  .updateExpr(Map("target.name" -> "source.name"))
  .whenMatched("target.name = 'should_delete'")
  .delete()
  .whenNotMatched("source.name = 'shoulde_insert'")
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()
  • You can use an asterisk (*) as a wildcard in the UPDATE clause or the INSERT clause. If the clause is UPDATE SET * or INSERT *, all fields are updated or inserted.

  • An ON condition that has a subquery is not supported. However, if the subquery is a scalar subquery that is expressed in an SQL statement, you can set the spark.sql.uncorrelated.scalar.subquery.preexecution.enabled parameter to true. This way, you can use the ON condition that has the subquery.

ALTER TABLE

Changes the schema and properties of an existing table. You can execute the ALTER TABLE statement to perform the following operations on a table:

  • ADD COLUMN: adds columns to a table.

  • RENAME COLUMN: renames a column. If you want to use this statement, column mapping must be enabled.

  • DROP COLOMN: drops a column from a table. If you want to use this statement, column mapping must be enabled.

  • SET/UNSET TBLPROPERTIES: configures the properties of a table, such as the description and storage format of a table.

  • RENAME TO: renames a table.

Important

If you want to perform the ADD COLUMN operation to add a column to a partitioned table, we recommend that you add the new field before the partition field. This helps prevent data exceptions when you use a query engine such as Hive to query the Delta table.

The following sample code provides an example on how to add a column at a specified position.

-- The schema of the delta_tbl table is (id IN, name STRING, pt STRING). pt is a partition field. 
-- Add the new_col field after the name field and before the pt field. 
ALTER TABLE dbName.tableName ADD COLUMN (new_col STRING AFTER name);

DESCRIBE HISTORY

Displays the details of the operation history of a Delta table.

This statement displays the following information in sequence: the version number, operation time, user ID, username, operation type, operation parameters, job information, notebook information, cluster ID, version of the table based on which the current operation is performed, isolation level, whether to directly append data, and operational metrics.

Note

The values of most of the preceding fields are displayed as null.

Examples:

  • Display all operation records.

    DESC HISTORY dbName.tableName;
  • Display the latest operation record.

    DESC HISTORY dbName.tableName limit 1;

CONVERT

Converts a Parquet table to a Delta table.

The CONVERT statement traverses all Parquet data files in a specific path, infers the schema of the current Parquet table, and generates the metadata information that is required by a Delta table. If the Parquet table is a partitioned table, you must specify partition fields and the data types of the partition fields.

Examples:

  • Convert the Parquet data files in the specified path to a Delta table.

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
  • Convert the Parquet data files in the specified path to a Delta table and configure the dt and hour fields as the partition fields.

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);

After you execute a CONVERT statement, the table path is converted to a format that is required by a Delta table, but no Delta table is created. You must execute the CREATE TABLE statement to create a Delta table. Do not specify table fields and partition fields in the statement. Sample code:

CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";

OPTIMIZE

Optimizes the data layout of a Delta table by merging small files or implementing Z-ordering. This statement improves query efficiency. You can use the OPTIMIZE statement to perform the following operations:

  • Optimize one or more specified partitions in a partitioned table.

  • Implement Z-ordering on specific non-partition fields of a table to optimize the data layout of the table during compact optimization.

Sample code:

set spark.databricks.delta.stats.skipping=true;
set spark.databricks.delta.stats.collect=true;

-- Optimize the entire dbName.tableName table. 
OPTIMIZE dbName.tableName;

-- Optimize the partitions in which the value of the date field is earlier than 2021-04-01. 
OPTIMIZE dbName.tableName WHERE date < '2021-04-01';

-- Optimize the partitions in which the value of the date field is earlier than 2021-04-01 and implement Z-ordering on the col2 and col3 columns. 
OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
Note
  • In most cases, the amount of each batch of data is small if you ingest streaming data to Delta Lake. As a result, a large number of small files are generated. You can periodically execute the OPTIMIZE statement to merge small files.

  • If the query mode is fixed, you can implement Z-ordering to optimize the data layout. For example, if some non-partition fields are specified as the query conditions, you can implement Z-ordering.

VACUUM

Deletes data files that are no longer required and have been stored for a period of time longer than the retention period from the table path.

The VACUUM statement does not delete the following data files:

  • Data files that are associated with the latest version of a Delta table.

  • Data files that are associated with a specific version for which the SAVEPOINT statement is executed.

You can use one of the following methods to specify the retention period of data files:

  • Configure the table property parameter delta.deletedFileRetentionDuration. The default retention period is one week.

  • Specify a retention period in hours in the VACUUM statement.

  • Syntax

    VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
  • Example

    -- Delete data files. 
    VACUUM dbName.tableName; 
    -- Delete the data files that are retained for more than 24 hours. 
    VACUUM dbName.tableName RETAIN 24 HOURS; 
    -- Display the data files that are retained for more than 24 hours. 
    VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN; 
    Note
    • You can periodically execute the VACUUM statement to delete data files to release more storage space.

    • Before you delete data files, you can execute the VACUUM statement that ends with DRY RUN to view the data files that will be deleted.

SAVEPOINT

Permanently retains the historical versions of a Delta table.

Each time Delta Lake performs a checkpoint operation on a Delta table, the metadata files of logs are deleted. The delta.checkpointInterval parameter is used to specify the interval between checkpoint operations. The delta.logRetentionDuration parameter is used to specify the retention period of the metadata files of logs. By default, the metadata files are retained for 30 days. If you execute the VACUUM statement, the data files that are no longer required in historical versions are also deleted. You can execute the SAVEPOINT statement to permanently retain data files and the metadata files of logs. You can use the SAVEPOINT statement together with the time travel feature to read the data of historical versions.

Examples:

  • Retain the version whose ID is 0.

    CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    Note

    /path/to/delta_tbl is the file system path of the Delta table.

  • Retain the latest version that is released before a specific point in time.

    CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";

Delete or view the record of a SAVEPOINT operation.

  • Delete the record of a SAVEPOINT operation.

    -- Delete data of a specific version. 
    DROP SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    -- Delete data before a specific timestamp. 
    DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
  • View the record of a SAVEPOINT operation.

    The following statements return the version number, version submission time, SAVEPOINT operation time, and other information about a SAVEPOINT operation:

    SHOW SAVEPOINT delta.`/path/to/delta_tbl`;
    SHOW SAVEPOINT dbName.tableName;

ROLLBACK

Rolls back a Delta table to a specific historical version.

If the specified historical version cannot be restored because the required data files or the metadata files of logs are lost, an error is reported. Examples:

  • Roll back a Delta table to the version whose ID is 0.

    ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • Roll back a Delta table to the latest version that is released before a specific point in time.

    ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";