本文为您介绍如何基于AnalyticDB for MySQL的Spark SQL及Apache Iceberg表格式,构建企业级数据湖并对数据进行生命周期管理。
AnalyticDB for MySQLSpark内置名为iceberg的Iceberg Catalog,该Catalog会使用AnalyticDB for MySQL的元数据服务。
数据批量入湖
通过Iceberg的CREATE TABLE AS SELECT(CTAS)快速落地原始数据,实现建表即写入操作。
CREATE DATABASE IF NOT EXISTS iceberg_lakehouse
LOCATION 'oss://<YOUR_OSS_BUCKET>/PATH/'
WITH DBPROPERTIES (catalog = 'mix');
-- 1. 清理
DROP TABLE IF EXISTS iceberg_lakehouse.iceberg_orders_staging;
-- 2. CTAS 快速入湖
CREATE TABLE iceberg_lakehouse.iceberg_orders_staging
USING iceberg
COMMENT 'Bronze Layer: Iceberg Staging'
AS
SELECT * FROM ADB_External_TPCH_10GB.external_orders;构建隐式分区表
隐式分区功能允许您基于原列转换定义分区(例如以下DDL中的 PARTITIONED BY (years(o_orderdate))),而无需创建额外的分区列。
-- 1. 清理
DROP TABLE IF EXISTS iceberg_lakehouse.iceberg_orders_prod;
-- 2. 定义生产表 (利用 Iceberg 变换分区)
CREATE TABLE iceberg_lakehouse.iceberg_orders_prod (
o_orderkey LONG,
o_custkey LONG,
o_orderstatus STRING,
o_totalprice DECIMAL(18,2),
o_orderdate DATE,
o_orderpriority STRING,
o_clerk STRING,
o_shippriority INT,
o_comment STRING
)
USING iceberg
-- 【核心】Iceberg 特有的分区变换语法
-- 不需要额外的 o_order_year 列,直接对日期列做 years 变换
PARTITIONED BY (years(o_orderdate))
COMMENT 'Silver Layer: Iceberg Hidden Partitioning';
-- 3. 写入数据
-- 即使只 SELECT 原始列,Iceberg 也会在底层自动计算分区并归档
INSERT INTO
iceberg_lakehouse.iceberg_orders_prod
SELECT
o_orderkey, o_custkey, o_orderstatus,
o_totalprice, o_orderdate, o_orderpriority,
o_clerk, o_shippriority, o_comment
FROM
iceberg_lakehouse.iceberg_orders_staging;
-- 4. 验证分区
-- Iceberg 的元数据表不仅能查看分区,还能看具体物理文件分布
SELECT partition, file_path, record_count
FROM iceberg_lakehouse.iceberg_orders_prod.files;行级变更 (Row-level Mutation)
Iceberg支持Copy-on-Write (CoW)或Merge-on-Read(MoR),通常默认是CoW。
-- 1. Update 操作
-- 场景:业务修正
UPDATE iceberg_lakehouse.iceberg_orders_prod
SET o_orderstatus = 'X'
WHERE o_orderdate = '1993-01-01' AND o_custkey = 12345;
-- ^ 注意:这里直接用 o_orderdate 过滤,Iceberg 会自动定位到 1993 年的分区,无需人工干预
-- 2. Delete 操作
DELETE FROM iceberg_lakehouse.iceberg_orders_prod
WHERE o_clerk = 'Clerk#000000001';模式演进
Iceberg在Schema变更方面比Delta更激进,它支持Full Schema Evolution(例如列重命名、类型升级),仅需修改元数据映射,而无需重写数据文件。
-- 1. 修改列名 (Rename) - 这是 Parquet/Hive 很难做到的
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod
RENAME COLUMN o_totalprice TO o_final_price;
-- 2. 增加列
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod
ADD COLUMN etl_ts timestamp;
-- 3. 变更列位置 (Reorder) - 把新列移到中间
ALTER TABLE iceberg_lakehouse.iceberg_orders_prod
ALTER COLUMN etl_ts AFTER o_orderdate;时间旅行
您可以查询Iceberg表的历史快照,并基于快照ID或时间戳访问对应历史版本的数据。
-- 1. 查看快照历史 (Snapshots),然后获取 snapshot_id
-- Iceberg 使用 .history 或 .snapshots 元数据表
SELECT * FROM iceberg_lakehouse.iceberg_orders_prod.history;
-- 2. 穿越查询 (By Snapshot ID)
-- 注意:Iceberg 推荐使用 SYSTEM_VERSION
SELECT count(*)
FROM iceberg_lakehouse.iceberg_orders_prod
FOR SYSTEM_VERSION AS OF 123456789; -- 请替换为实际 ID
-- 3. 穿越查询 (By Timestamp)
SELECT count(*)
FROM iceberg_lakehouse.iceberg_orders_prod
TIMESTAMP AS OF (current_timestamp() - INTERVAL 10 MINUTES); -- 请根据实际情况修改时间间隔表文件管理
清理过期快照 (Expire Snapshots)
Iceberg采用多版本并发控制(MVCC),删除数据只是标记,您需要通过expire_snapshots清理“过期”旧快照,物理文件才会被清理。常用的清理方式示例如下。
清理指定时间戳T之前的快照。
CALL system.expire_snapshots( table => 'iceberg_lakehouse.iceberg_orders_prod', older_than => TIMESTAMP '2023-10-01 00:00:00.000' -- 请替换为实际时间,通常传入毫秒时间戳 )保留最近的N个快照。
保留策略:以
older_than = T(时间戳,默认5天前) 作为过期时间截断点,只清理早于T的快照;同时以retain_last = N(默认 1) 作为最小保留量,即便早于T,也会保留最近的N个快照不被过期。特殊用法:您可以设置T为未来时间点,并指定
retain_last,实现保留最近的N个快照,示例如下。-- cell中运行时需要删除最后的; CALL iceberg.system.expire_snapshots( table => 'iceberg_lakehouse.iceberg_orders_prod', older_than => TIMESTAMP '2027-06-30 00:00:00.000', retain_last => 1 )
删除孤儿文件 (Remove Orphan Files)
当写入任务失败,或者DROP TABLE时未添加PURGE关键字,可能导致路径下产生很多未被引用的孤立文件,可以使用以下命令扫描并清理。
CALL system.remove_orphan_files(
table => 'iceberg_lakehouse.iceberg_orders_prod'
)删除Iceberg表
删除表定义,但不删除表的物理文件。
DROP TABLE iceberg_lakehouse.iceberg_orders_prod;如删除表元数据时需要删除数据,需在启动Spark Session时添加参数
spark.hive.purgeExternalTableData.enabled=true;
spark.hive.purgeManagedTableData.enabled=true;
删除表的同时删除文件。
DROP TABLE iceberg_lakehouse.iceberg_orders_prod PURGE;