Delta Lake是一個開源儲存架構,旨在資料湖之上構建LakeHouse架構。Delta Lake提供了ACID事務支援、可擴充的中繼資料處理功能,並能夠在現有的資料湖(如OSS、Amazon S3和HDFS)上整合流處理與批處理。此外,Delta Lake還支援多種引擎,如Spark、PrestoDB和Flink,以及多種程式設計語言的API,包括Scala、Java、Rust和Python,以便於訪問。
前提條件
已建立工作空間,詳情請參見建立工作空間。
操作流程
步驟一:建立SQL會話
進入會話管理頁面。
在左側導覽列,選擇。
在Spark頁面,單擊目標工作空間名稱。
在EMR Serverless Spark頁面,單擊左側導覽列中的會話管理。
在SQL會話頁面,單擊建立SQL會話。
在建立SQL會話頁面的Spark配置地區,配置以下資訊,單擊建立。詳情請參見管理SQL會話。
中繼資料是當前工作空間的預設Catalog。如果您希望將預設Catalog修改為外部的Hive Metastore,可以參見串連外部Hive Metastore Service。
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
步驟二:讀寫Delta Lake表
進入SQL開發頁面。
在EMR Serverless Spark頁面,單擊左側導覽列中的開發。
在開發目錄頁簽下,單擊
表徵圖。在建立對話方塊中,輸入名稱(例如users_task),類型使用預設的,然後單擊確定。
拷貝如下代碼到新增的SparkSQL頁簽(users_task)中。
CREATE DATABASE IF NOT EXISTS ss_delta_db; CREATE TABLE ss_delta_db.delta_tbl (id INT, name STRING) USING delta; INSERT INTO ss_delta_db.delta_tbl VALUES (1, "a"), (2, "b"); SELECT id, name FROM ss_delta_db.delta_tbl ORDER BY id;在資料庫下拉式清單中選擇一個資料庫,在會話下拉式清單中選擇剛剛建立的SQL會話。
單擊運行,執行任務。返回資訊如下所示。

步驟三:更新操作
Delta Lake支援多種資料更新操作,包括Update、Delete和Merge Into。以下是具體樣本。
-- Update操作
UPDATE ss_delta_db.delta_tbl SET name = "a_v2" WHERE id = 1;
-- Delete操作
DELETE FROM ss_delta_db.delta_tbl WHERE id = 2;
-- Merge Into操作
-- 建立暫存資料表並插入資料
CREATE TABLE ss_delta_db.tmp_tbl(id INT, name STRING) USING delta;
INSERT INTO ss_delta_db.tmp_tbl VALUES (1, "a_v3"), (3, "c");
-- 執行Merge Into操作
MERGE INTO ss_delta_db.delta_tbl AS target
USING ss_delta_db.tmp_tbl AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
--驗證結果
SELECT * FROM ss_delta_db.delta_tbl ORDER BY id;返回資訊如下所示。

步驟四:清理操作
測試完成後,建議清理測試資源以避免佔用儲存空間。執行以下命令如下所示。
DROP TABLE ss_delta_db.delta_tbl;
DROP TABLE ss_delta_db.tmp_tbl;
DROP DATABASE ss_delta_db;上述操作將永久刪除表和資料庫,請確保資料已備份或不再需要。
相關文檔
SQL任務和任務編排完整的開發流程樣本,請參見SparkSQL開發快速入門。
更多Delta Lake相關用法和配置,參見Delta Lake官方文檔。