本文为您介绍如何基于AnalyticDB for MySQL的Spark SQL及Delta Lake表格式,构建企业级数据湖并对数据进行生命周期管理。
贴源层入湖(Bronze层)
通过CREATE TABLE AS SELECT(CTAS)快速将外部数据导入Delta Lake,暂不定义Schema与分区。
CREATE DATABASE IF NOT EXISTS deltalake_db LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/';-- 1. 清理元数据
DROP TABLE IF EXISTS deltalake_db.orders_staging;
-- 2. CTAS 快速入湖 (无分区,纯全量复制)
CREATE OR REPLACE TABLE deltalake_db.orders_staging
USING DELTA
COMMENT 'Bronze Layer: Raw Staging Table'
AS
SELECT * FROM ADB_External_TPCH_10GB.external_orders;
-- 验证:数据已落地,但此时全表扫描性能较差(因为没分区)
SELECT count(*) FROM deltalake_db.orders_staging;构建分区表
创建按“年”物理分区的生产表,将数据从orders_staging表清洗并加载到生产表,解决小文件问题并提升查询效率。
-- 1. 清理元数据
DROP TABLE IF EXISTS deltalake_db.orders_prod;
-- 2. 定义生产表结构 (包含显式的 o_order_year 列)
CREATE TABLE IF NOT EXISTS deltalake_db.orders_prod (
o_orderkey LONG,
o_custkey LONG,
o_orderstatus STRING,
o_totalprice DECIMAL(18,2),
o_orderdate DATE,
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INT,
o_comment STRING,
o_order_year INT -- 【显式分区列】
)
USING DELTA
PARTITIONED BY (o_order_year) -- 按年物理隔离
COMMENT 'Silver Layer: Partitioned Production Table';
-- 3. 数据聚合/清洗写入
-- 从 Staging 表读取,计算出年份,写入 Prod 表
INSERT INTO deltalake_db.orders_prod
SELECT
-- 显式列出源表的原始字段 (不使用 *)
o_orderkey,
o_custkey,
o_orderstatus,
o_totalprice,
o_orderdate,
o_orderpriority,
o_clerk,
o_shippriority,
o_comment,
-- 计算分区字段
YEAR(o_orderdate) as o_order_year
FROM deltalake_db.orders_staging;
-- 验证:查看分区信息
DESCRIBE DETAIL deltalake_db.orders_prod;业务变更与事务 (Mutation)
在生产表orders_prod上进行日常的数据修正操作。
-- 1. 更新操作 (ACID)
-- 场景:修正 1993 年某位 clerk 的所有订单状态
UPDATE deltalake_db.orders_prod
SET o_orderstatus = 'P'
WHERE o_order_year = 1993 -- 利用分区裁剪加速找到文件
AND o_clerk = 'Clerk#000000001';
-- 2. 删除操作
DELETE FROM deltalake_db.orders_prod
WHERE o_orderpriority = '5-LOW' AND o_order_year = 1992;增量合并 (Upsert)
处理T+1增量数据。这里构造一个包含新数据的临时视图,使用MERGE将增量数据合并到生产表。
-- 模拟增量数据流
MERGE INTO deltalake_db.orders_prod AS target
USING (
SELECT
999999999 AS o_orderkey,
12345 AS o_custkey,
'O' AS o_orderstatus,
100.00 AS o_totalprice,
cast('2025-05-20' as date) AS o_orderdate,
'1-URGENT' AS o_orderpriority,
'Clerk#Auto' AS o_clerk,
0 AS o_shippriority,
'New Order' AS o_comment,
2025 AS o_order_year -- 必须补齐分区列
) AS source
ON target.o_orderkey = source.o_orderkey
WHEN MATCHED THEN UPDATE SET target.o_totalprice = source.o_totalprice
WHEN NOT MATCHED THEN INSERT *;历史回溯
利用数据版本管理能力实现数据历史追溯。
-- 1. 查看操作历史
DESCRIBE HISTORY deltalake_db.orders_prod;
-- 2. 穿越回刚建表完成时的状态 (Version 0 是建表,Version 1 是 INSERT)
-- 假设 Version 1 是刚从 Staging 表导完数据的状态
SELECT COUNT(*) FROM deltalake_db.orders_prod VERSION AS OF 1;性能优化 (Maintenance)
对生产表进行物理优化。这是Delta Lake区别于Parquet表的核心优势。
-- 1. 小文件合并 (Compaction)
-- 会将同一个分区内零散的小文件合并为大文件
OPTIMIZE deltalake_db.orders_prod;
-- 2. Z-Order 多维聚类 (Data Skipping)
-- 将相同 o_custkey 的数据物理排布在一起,极大加速 "WHERE o_custkey = ..." 的查询
OPTIMIZE deltalake_db.orders_prod ZORDER BY (o_custkey);
-- 3. 物理清理 (Vacuum)
-- 删除不再需要的孤儿文件,释放 OSS 空间
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM deltalake_db.orders_prod RETAIN 0 HOURS; -- 仅供测试,生产建议 168 HOURS删除表定义和数据
说明
如果希望DROP TABLE PURGE可以顺利触发物理文件删除,在启动Spark前需要配置如下参数,配置后需重启。
spark.hive.purgeExternalTableData.enabled=true;spark.hive.purgeManagedTableData.enabled=true;
DROP TABLE IF EXISTS deltalake_db.orders_prod purge;