全部產品
Search
文件中心

E-MapReduce:資料管理

更新時間:Jul 01, 2024

阿里雲EMR Delta Lake提供了強大的資料處理能力,可以協助您管理和操作資料,確保資料的品質和一致性。本文為您介紹EMR Delta Lake如何進行刪除、更新與合并資料等操作。

DELETE

該命令用於刪除資料。樣本如下。

DELETE FROM delta_table [AS t] [WHERE t.date < '2019-11-11'];
import io.delta.tables.
val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")
deltaTable.delete("date < '2019-11-11'")
import org.apache.spark.sql.functions.
import spark.implicits.
deltaTable.delete(col("date") < "2019-11-11")
說明

使用DELETE命令時,如果沒有條件限制,則會刪除所有資料。

  • 暫不支援帶有子查詢的WHERE條件。但如果子查詢為標量子查詢且使用SQL,可以設定spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue後進行查詢,例如:

    DELETE FROM delta_table WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • 如果您需要根據另一張表對目標表的匹配行進行刪除(例如DELETE FROM target WHERE target.col = ref.col ...),請使用Merge文法。

UPDATE

該命令用於更新資料。樣本如下。

UPDATE delta_table [AS t] SET t.id = t.id + 1 [WHERE t.date < '2019-11-11'];
import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, "/tmp/delta_table")

deltaTable.updateExpr(            //使用SQL字串。
  "name = 'Robet'",
  Map("name" -> "'Robert'")

import org.apache.spark.sql.functions._
import spark.implicits._

deltaTable.update(                //使用SQL函數和隱式轉換。
  col("name") === "Robet"),
  Map("name" -> lit("Robert"));
  • 暫不支援帶有子查詢的WHERE條件。但如果子查詢為標量子查詢且使用SQL,可以設定spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue後進行查詢,例如:

    UPDATE delta_table SET t.id = t.id + 1 WHERE t.date < (SELECT date FROM ref_table WHERE ....)
  • 如果要根據另一張表對目標表的匹配行進行更新(例如,UPDATE target SET target.col = ref.col ... WHERE target.col = ref.col ...),請使用Merge文法。

MERGE

該命令用於合并資料。樣本如下。

MERGE INTO target AS t
USING source AS s
ON t.date = s.date
WHEN MATCHED [AND t.name = 'should_update'] THEN UPDATE SET target.name = source.name
WHEN MATCHED [AND t.name = 'should_delete'] THEN DELETE
WHEN NOT MATCHED [AND s.name = 'should_insert'] THEN INSERT (t.date, t.name, t.id) VALUES (s.date, s.name.s.id)
import io.delta.tables._
import org.apache.spark.sql.functions._

val updatesDF = ...  // define the updates DataFrame[date, id, name]

DeltaTable.forPath(spark, "/tmp/delta_table")
  .as("target")
  .merge(updatesDF.as("source"), "target.id = source.id")
  .whenMatched("target.name = 'should_update'")
  .updateExpr(Map("target.name" -> "source.name"))
  .whenMatched("target.name = 'should_delete'")
  .delete()
  .whenNotMatched("source.name = 'shoulde_insert'")
  .insertExpr(
    Map(
      "date" -> "updates.date",
      "eventId" -> "updates.eventId",
      "data" -> "updates.data"))
  .execute()
  • UPDATE子句和INSERT子句支援*文法,如果設定為UPDATE SET *或者INSERT *,則會更新或插入所有欄位。

  • 暫不支援帶有子查詢的ON條件,但如果子查詢為標量子查詢的形式且使用SQL,可以設定spark.sql.uncorrelated.scalar.subquery.preexecution.enabledtrue後進行查詢。

ALTER TABLE

該命令用於更改現有表的結構和屬性。支援對錶進行以下操作:

  • ADD COLUMN:可以向表中添加新的列。

  • RENAME COLUMN(需要開啟Column Mapping):可以將表中的列更改為新的名稱。

  • DROP COLOMN(需要開啟Column Mapiing):可以從表中刪除指定的列。

  • SET/UNSET TBLPROPERTIES:可以設定表層級的屬性,如表的描述、表的儲存格式等。

  • RENAME TO:將表的名稱更改為新的名稱。

重要

對分區表執行ADD COLUMN操作,建議將新增欄位追加到分區欄位之前,以避免在查詢引擎如Hive中查詢Delta表時出現資料異常。

例如,在指定位置執行ADD COLUMN操作。

-- 假設delta_tbl表的Schema為(id IN, name STRING, pt STRING),其中pt為分區欄位。
-- 新增new_col欄位,並將其追加到name欄位後,pt欄位前。
ALTER TABLE dbName.tableName ADD COLUMN (new_col STRING AFTER name);

DESCRIBE HISTORY

該命令用於顯示Delta Lake的詳細操作歷史。

按照順序展示版本號碼、操作時間、使用者ID、使用者名稱、操作類型、巨集指令引數、作業資訊、Notebook資訊、叢集、操作基於的前置版本、隔離等級、是否直接追加和操作Metrics等資訊。

說明

通常大多數資訊顯示為Null。

樣本如下:

  • 顯示所有的操作記錄。

    DESC HISTORY dbName.tableName;
  • 顯示最新一條的操作記錄。

    DESC HISTORY dbName.tableName limit 1;

CONVERT

該命令用於將Parquet格式的錶轉成Delta表。

CONVERT遍曆指定路徑下的Parquet資料檔案,推測表的Schema,產生Delta表需要的中繼資料資訊。如果Parquet表本身是分區表,則需要額外指定分區欄位和類型。

樣本如下:

  • 轉換指定路徑下的Parquet資料檔案。

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_without_partition`;
  • 轉換指定路徑下的Parquet資料檔案,並按照dt和hour進行分區。

    CONVERT TO DELTA parquet.`oss://region/path/to/tbl_with_partition` PARTITIONED BY (dt string, hour int);

使用CONVERT後,僅將表路徑構建為Delta表所需的格式,尚未將其註冊為表,需要繼續使用CREATE TABLE命令。此時無需指定建表欄位和分區欄位。以下是具體樣本。

CREATE TABLE tbl_without_partition
USING delta
LOCATION "oss://region/path/to/tbl_without_partition";

OPTIMIZE

該命令通過合并小檔案或ZOrder排序最佳化Delta表的資料布局,提升查詢效率。OPTIMIZ命令支援如下操作:

  • 針對分區表,可以通過指定分區來進行最佳化。

  • 在進行正常Compact最佳化時,可以通過指定非分區欄位進行ZOrder排序,以調整資料布局。

程式碼範例如下。

set spark.databricks.delta.stats.skipping=true;
set spark.databricks.delta.stats.collect=true;

-- 對dbName.tableName表進行全域最佳化。
OPTIMIZE dbName.tableName;

-- 對2021-04-01之前的分區進行最佳化。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01';

-- 對2021-04-01之前的分區進行最佳化,並使用col2, col3列進行排序。
OPTIMIZE dbName.tableName WHERE date < '2021-04-01' ZORDER BY (col2, col3);
說明
  • 對於Streaming入湖情境,通常每個batch較小,會導致小檔案較多,可以定期執行Optimize命令合并小檔案。

  • 對於查詢模式相對固定的情境,例如,除分區欄位外,僅指定幾個列作為查詢條件時,可以採用Zorder方式最佳化。

VACUUM

該命令可以刪除表路徑中不需要的,且超過指定時間的資料檔案。

EMR的Delta Lake定義資料檔案不需要包含以下兩部分:

  • 當前最新版本關聯到的資料檔案。

  • 執行過Savepoint的特定版本關聯到的資料檔案。

VACUUM命令可以通過兩種方式指定刪除多久前的資料檔案:

  • 通過參數delta.deletedFileRetentionDuration配置表屬性,預設值為1周。

  • 通過VACUUM命令指定,單位為小時。

  • 文法

    VACUUM (path=STRING | table=tableIdentifier) (RETAIN number HOURS)? (DRY RUN)?
  • 樣本

    -- 刪除資料檔案。
    VACUUM dbName.tableName; 
    -- 刪除24小時之前的資料檔案。
    VACUUM dbName.tableName RETAIN 24 HOURS; 
    -- 顯示待刪除24小時之前的資料檔案。
    VACUUM dbName.tableName RETAIN 24 HOURS DRY RUN; 
    說明
    • 根據您建立表的實際情況,可以定期執行VACUUM命令,節省儲存空間。

    • 實際執行VACUUM命令前,可以先通過DRY RUN命令,確認刪除內容。

SAVEPOINT

該命令可以永久儲存Delta Lake的歷史版本。

Delta Lake會在每次執行CheckPoint(固定版本間隔,由參數delta.checkpointInterval決定)時清理掉log中繼資料檔案(預設保留30天內的log中繼資料,由參數delta.logRetentionDuration決定)。通過VACUUM也會刪除歷史版本不再需要的資料檔案。執行SAVEPOINT命令,可以永久避免log中繼資料和資料檔案被刪除,同時配合time-travel的能力,可以讀取歷史版本資料。

樣本如下:

  • 儲存ID為0的版本。

    CREATE SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    說明

    /path/to/delta_tbl 為您實際的Delta表檔案系統路徑

  • 儲存指定時間之前最近的版本。

    CREATE SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";

刪除或查看SAVEPOINT操作記錄。樣本如下:

  • 刪除記錄

    --刪除特定版本的資料。
    DROP SAVEPOINT delta.`/path/to/delta_tbl` VERSION AS OF 0;
    --刪除特定時間戳記之前的資料。
    DROP SAVEPOINT dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";
  • 查看記錄

    可以顯示SAVEPOINT的版本號碼、版本提交時間、SAVEPOINT時間及其他資訊。

    SHOW SAVEPOINT delta.`/path/to/delta_tbl`;
    SHOW SAVEPOINT dbName.tableName;

ROLLBACK

該命令可以恢複到Delta Lake某個歷史版本。

如果指定要恢複到的歷史版本不可重建(即缺失log中繼資料或者對應的資料檔案),則拋出異常。樣本如下:

  • 復原到ID為0的版本。

    ROLLBACK delta.`/path/to/delta_tbl` VERSION AS OF 0;
  • 復原到指定時間之前最近的版本。

    ROLLBACK dbName.tableName TIMESTAMP AS OF "2021-04-01 10:00:00";