全部产品
Search
文档中心

云原生数据仓库AnalyticDB:基于Spark SQL管理Apache Iceberg表的生命周期

更新时间:Jan 14, 2026

本文为您介绍如何基于AnalyticDB for MySQL的Spark SQL及Apache Iceberg表格式,构建企业级数据湖并对数据进行生命周期管理。

说明

AnalyticDB for MySQLSpark内置名为iceberg的Iceberg Catalog,该Catalog会使用AnalyticDB for MySQL的元数据服务。

数据批量入湖

通过Iceberg的CREATE TABLE AS SELECT(CTAS)快速落地原始数据,实现建表即写入操作。

CREATE DATABASE IF NOT EXISTS iceberg_lakehouse 
LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/'
WITH DBPROPERTIES (catalog = 'mix');

-- 1. 清理
DROP TABLE IF EXISTS iceberg_lakehouse.iceberg_orders_staging;

-- 2. CTAS 快速入湖
CREATE TABLE iceberg_lakehouse.iceberg_orders_staging
USING iceberg
COMMENT 'Bronze Layer: Iceberg Staging'
AS 
SELECT * FROM ADB_External_TPCH_10GB.external_orders;

构建隐式分区表

隐式分区功能允许您基于原列转换定义分区(例如以下DDL中的 PARTITIONED BY (years(o_orderdate))),而无需创建额外的分区列。

-- 1. 清理
DROP TABLE IF EXISTS iceberg_lakehouse.iceberg_orders_prod;

-- 2. 定义生产表 (利用 Iceberg 变换分区)
CREATE TABLE iceberg_lakehouse.iceberg_orders_prod (
    o_orderkey LONG,
    o_custkey LONG,
    o_orderstatus STRING,
    o_totalprice DECIMAL(18,2),
    o_orderdate DATE,
    o_orderpriority STRING,
    o_clerk STRING,
    o_shippriority INT,
    o_comment STRING
)
USING iceberg
-- 【核心】Iceberg 特有的分区变换语法
-- 不需要额外的 o_order_year 列,直接对日期列做 years 变换
PARTITIONED BY (years(o_orderdate)) 
COMMENT 'Silver Layer: Iceberg Hidden Partitioning';

-- 3. 写入数据
-- 即使只 SELECT 原始列,Iceberg 也会在底层自动计算分区并归档
INSERT INTO 
  iceberg_lakehouse.iceberg_orders_prod
SELECT 
o_orderkey, o_custkey, o_orderstatus, 
o_totalprice, o_orderdate, o_orderpriority, 
o_clerk, o_shippriority, o_comment 
FROM 
iceberg_lakehouse.iceberg_orders_staging;

-- 4. 验证分区
-- Iceberg 的元数据表不仅能查看分区,还能看具体物理文件分布
SELECT partition, file_path, record_count 
FROM iceberg_lakehouse.iceberg_orders_prod.files;

行级变更 (Row-level Mutation)

Iceberg支持Copy-on-Write (CoW)或Merge-on-Read(MoR),通常默认是CoW。

-- 1. Update 操作
-- 场景:业务修正
UPDATE iceberg_lakehouse.iceberg_orders_prod
SET o_orderstatus = 'X'
WHERE o_orderdate = '1993-01-01' AND o_custkey = 12345; 
-- ^ 注意:这里直接用 o_orderdate 过滤,Iceberg 会自动定位到 1993 年的分区,无需人工干预

-- 2. Delete 操作
DELETE FROM iceberg_lakehouse.iceberg_orders_prod
WHERE o_clerk = 'Clerk#000000001';

模式演进

Iceberg在Schema变更方面比Delta更激进,它支持Full Schema Evolution(例如列重命名、类型升级),仅需修改元数据映射,而无需重写数据文件。

-- 1. 修改列名 (Rename) - 这是 Parquet/Hive 很难做到的
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod 
RENAME COLUMN o_totalprice TO o_final_price;

-- 2. 增加列
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod 
ADD COLUMN etl_ts timestamp;

-- 3. 变更列位置 (Reorder) - 把新列移到中间
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod 
ALTER COLUMN etl_ts AFTER o_orderdate;

时间旅行

您可以查询Iceberg表的历史快照,并基于快照ID或时间戳访问对应历史版本的数据。

-- 1. 查看快照历史 (Snapshots),然后获取 snapshot_id
-- Iceberg 使用 .history 或 .snapshots 元数据表
SELECT * FROM iceberg_lakehouse.iceberg_orders_prod.history;

-- 2. 穿越查询 (By Snapshot ID)
-- 注意:Iceberg 推荐使用 SYSTEM_VERSION
SELECT count(*) 
FROM iceberg_lakehouse.iceberg_orders_prod 
FOR SYSTEM_VERSION AS OF 123456789; -- 请替换为实际 ID

-- 3. 穿越查询 (By Timestamp)
SELECT count(*) 
FROM iceberg_lakehouse.iceberg_orders_prod 
TIMESTAMP AS OF (current_timestamp() - INTERVAL 10 MINUTES); -- 请根据实际情况修改时间间隔

表文件管理

清理过期快照 (Expire Snapshots)

Iceberg采用多版本并发控制(MVCC),删除数据只是标记,您需要通过expire_snapshots清理“过期”旧快照,物理文件才会被清理。常用的清理方式示例如下。

  • 清理指定时间戳T之前的快照。

    CALL system.expire_snapshots(
     table => 'iceberg_lakehouse.iceberg_orders_prod',
     older_than => TIMESTAMP '2023-10-01 00:00:00.000' -- 请替换为实际时间,通常传入毫秒时间戳
    )
  • 保留最近的N个快照。

    保留策略:以older_than = T(时间戳,默认5天前) 作为过期时间截断点,只清理早于T的快照;同时以retain_last = N(默认 1) 作为最小保留量,即便早于T,也会保留最近的N个快照不被过期。

    特殊用法:您可以设置T为未来时间点,并指定retain_last,实现保留最近的N个快照,示例如下。

    -- cell中运行时需要删除最后的;
    CALL iceberg.system.expire_snapshots(
      table => 'iceberg_lakehouse.iceberg_orders_prod', 
      older_than => TIMESTAMP '2027-06-30 00:00:00.000', 
      retain_last => 1
    )

删除孤儿文件 (Remove Orphan Files)

当写入任务失败,或者DROP TABLE时未添加PURGE关键字,可能导致路径下产生很多未被引用的孤立文件,可以使用以下命令扫描并清理。

CALL system.remove_orphan_files(
    table => 'iceberg_lakehouse.iceberg_orders_prod'
)

删除Iceberg表

  • 删除表定义,但不删除表的物理文件。

    DROP TABLE iceberg_lakehouse.iceberg_orders_prod;

    如删除表元数据时需要删除数据,需在启动Spark Session时添加参数

    • spark.hive.purgeExternalTableData.enabled=true;

    • spark.hive.purgeManagedTableData.enabled=true;

  • 删除表的同时删除文件。

    DROP TABLE iceberg_lakehouse.iceberg_orders_prod PURGE;