全部产品
Search
文档中心

云原生数据仓库AnalyticDB:基于Spark SQL管理Delta Lake表的生命周期

更新时间:Jan 14, 2026

本文为您介绍如何基于AnalyticDB for MySQL的Spark SQL及Delta Lake表格式,构建企业级数据湖并对数据进行生命周期管理。

贴源层入湖(Bronze层)

通过CREATE TABLE AS SELECT(CTAS)快速将外部数据导入Delta Lake,暂不定义Schema与分区。

CREATE DATABASE IF NOT EXISTS deltalake_db LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/';
-- 1. 清理元数据
DROP TABLE IF EXISTS deltalake_db.orders_staging;

-- 2. CTAS 快速入湖 (无分区,纯全量复制)
CREATE OR REPLACE TABLE  deltalake_db.orders_staging
USING DELTA
COMMENT 'Bronze Layer: Raw Staging Table'
AS 
SELECT * FROM ADB_External_TPCH_10GB.external_orders;

-- 验证:数据已落地,但此时全表扫描性能较差(因为没分区)
SELECT count(*) FROM deltalake_db.orders_staging;

构建分区表

创建按“年”物理分区的生产表,将数据从orders_staging表清洗并加载到生产表,解决小文件问题并提升查询效率。

-- 1. 清理元数据
DROP TABLE IF EXISTS deltalake_db.orders_prod;

-- 2. 定义生产表结构 (包含显式的 o_order_year 列)
CREATE TABLE IF NOT EXISTS deltalake_db.orders_prod (
    o_orderkey LONG,
    o_custkey LONG,
    o_orderstatus STRING,
    o_totalprice DECIMAL(18,2),
    o_orderdate DATE,
    o_orderpriority STRING,
    o_clerk STRING,
    o_shippriority INT,
    o_comment STRING,
    o_order_year INT -- 【显式分区列】
)
USING DELTA
PARTITIONED BY (o_order_year) -- 按年物理隔离
COMMENT 'Silver Layer: Partitioned Production Table';

-- 3. 数据聚合/清洗写入
-- 从 Staging 表读取,计算出年份,写入 Prod 表
INSERT INTO deltalake_db.orders_prod
SELECT 
    -- 显式列出源表的原始字段 (不使用 *)
    o_orderkey, 
    o_custkey, 
    o_orderstatus, 
    o_totalprice, 
    o_orderdate, 
    o_orderpriority, 
    o_clerk, 
    o_shippriority, 
    o_comment,
    -- 计算分区字段
    YEAR(o_orderdate) as o_order_year
FROM deltalake_db.orders_staging;

-- 验证:查看分区信息
DESCRIBE DETAIL deltalake_db.orders_prod;

业务变更与事务 (Mutation)

在生产表orders_prod上进行日常的数据修正操作。

-- 1. 更新操作 (ACID)
-- 场景:修正 1993 年某位 clerk 的所有订单状态
UPDATE deltalake_db.orders_prod 
SET o_orderstatus = 'P' 
WHERE o_order_year = 1993 -- 利用分区裁剪加速找到文件
  AND o_clerk = 'Clerk#000000001';

-- 2. 删除操作
DELETE FROM deltalake_db.orders_prod 
WHERE o_orderpriority = '5-LOW' AND o_order_year = 1992;

增量合并 (Upsert)

处理T+1增量数据。这里构造一个包含新数据的临时视图,使用MERGE将增量数据合并到生产表。

-- 模拟增量数据流
MERGE INTO deltalake_db.orders_prod AS target
USING (
    SELECT 
        999999999 AS o_orderkey, 
        12345 AS o_custkey, 
        'O' AS o_orderstatus, 
        100.00 AS o_totalprice, 
        cast('2025-05-20' as date) AS o_orderdate, 
        '1-URGENT' AS o_orderpriority, 
        'Clerk#Auto' AS o_clerk, 
        0 AS o_shippriority, 
        'New Order' AS o_comment,
        2025 AS o_order_year -- 必须补齐分区列
) AS source
ON target.o_orderkey = source.o_orderkey
WHEN MATCHED THEN UPDATE SET target.o_totalprice = source.o_totalprice
WHEN NOT MATCHED THEN INSERT *;

历史回溯

利用数据版本管理能力实现数据历史追溯。

-- 1. 查看操作历史
DESCRIBE HISTORY deltalake_db.orders_prod;

-- 2. 穿越回刚建表完成时的状态 (Version 0 是建表,Version 1 是 INSERT)
-- 假设 Version 1 是刚从 Staging 表导完数据的状态
SELECT COUNT(*) FROM deltalake_db.orders_prod VERSION AS OF 1;

性能优化 (Maintenance)

对生产表进行物理优化。这是Delta Lake区别于Parquet表的核心优势。

-- 1. 小文件合并 (Compaction)
-- 会将同一个分区内零散的小文件合并为大文件
OPTIMIZE deltalake_db.orders_prod;

-- 2. Z-Order 多维聚类 (Data Skipping)
-- 将相同 o_custkey 的数据物理排布在一起,极大加速 "WHERE o_custkey = ..." 的查询
OPTIMIZE deltalake_db.orders_prod ZORDER BY (o_custkey);

-- 3. 物理清理 (Vacuum)
-- 删除不再需要的孤儿文件,释放 OSS 空间
SET spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM deltalake_db.orders_prod RETAIN 0 HOURS; -- 仅供测试,生产建议 168 HOURS

删除表定义和数据

说明

如果希望DROP TABLE PURGE可以顺利触发物理文件删除,在启动Spark前需要配置如下参数,配置后需重启。

  • spark.hive.purgeExternalTableData.enabled=true;

  • spark.hive.purgeManagedTableData.enabled=true;

DROP TABLE IF EXISTS deltalake_db.orders_prod purge;