本文将介绍如何利用Spark执行存储治理任务,清理孤立文件、过期快照以及合并小文件,可以有效降低存储成本并提升查询性能。
前提条件
请先根据需求完成Catalog授权:
下载依赖配置
依赖配置
Serverless Spark
将四个依赖包上传到OSS,并在session中添加额外的Spark 配置。
spark.emr.serverless.excludedModules:固定值为iceberg。spark.emr.serverless.user.defined.jars:实际JAR包的存放路径。
spark.emr.serverless.excludedModules iceberg
spark.emr.serverless.user.defined.jars oss://${bucket}/iceberg-spark-runtime-3.5_2.12-1.10.1.jar,oss://${bucket}/iceberg-aws-bundle-1.10.1.jar,oss://${bucket}/bennett-iceberg-plugin-2.8.0.jar,oss://${bucket}/guava-31.1-jre.jarSpark Procedures
Iceberg 提供了多种 Spark 存储过程用于表维护。您可以通过 CALL 语法直接在 Spark SQL 中执行这些操作。
清理孤立文件 (Remove Orphan Files)
孤立文件是指不再被任何 Iceberg 表元数据(Metadata)引用的物理文件。这些文件通常产生于删除表操作或失败的 ETL 任务中。定期清理孤立文件是释放存储空间的关键步骤。
说明
只需替换table参数的值为实际库表名,如未备注,无需替换。
-- 1. 干运行模式 (Dry Run)
-- 仅列出将被删除的文件,不执行实际删除。用于确认清理范围。
CALL iceberg_catalog.system.remove_orphan_files(
table => 'db.table_name',
prefix_listing => true, --使用对象存储的前缀列表功能。这能显著提高在OSS/S3等对象存储上的扫描性能。
dry_run => true --仅模拟执行并返回拟删除文件列表。
);
-- 2. 默认清理
-- 删除 3 天前产生的孤立文件。
CALL iceberg_catalog.system.remove_orphan_files(
table => 'db.table_name',
prefix_listing => true
);
-- 3. 指定时间与并发
-- 删除指定时间戳之前的孤立文件,并开启并行删除。
CALL iceberg_catalog.system.remove_orphan_files(
table => 'db.table_name',
older_than => TIMESTAMP '2024-01-01 00:00:00', --替换指定时间
dry_run => false,
max_concurrent_deletes => 4, --执行文件删除操作的并发线程数。
prefix_listing => true
);更多详情请参阅 Iceberg Remove Orphan Files。
清理过期快照 (Expire Snapshots)
Iceberg 通过快照(Snapshots)跟踪表的所有变更历史。随着时间推移,过多的快照会导致元数据膨胀并占用大量存储空间。该过程用于删除不再需要的旧快照及其关联的数据文件。
-- 1. 基于时间清理
-- 删除早于指定时间戳的快照。
CALL iceberg_catalog.system.expire_snapshots(
table => 'db.table_name',
older_than => TIMESTAMP '2024-01-01 00:00:00' --替换指定时间
);
-- 2. 保留最新快照
-- 无论时间如何,始终保留最近的 N 个快照。
CALL iceberg_catalog.system.expire_snapshots(
table => 'db.table_name',
retain_last => 5
);
-- 3. 删除特定快照
-- 仅删除 ID 列表中指定的快照。
CALL iceberg_catalog.system.expire_snapshots(
table => 'db.table_name',
snapshot_ids => ARRAY(123456789, 987654321) --替换指定ID
);更多详情请参阅Iceberg Expire Snapshots
合并小文件 (Rewrite Data Files)
频繁的小批量写入会产生大量小文件,严重影响读取性能。该过程通过重写数据文件,将其合并为符合目标大小的较大文件,从而优化查询效率。
-- 1. 默认合并
-- 使用默认的 binpack 策略合并文件。
CALL iceberg_catalog.system.rewrite_data_files('db.table_name');
-- 2. 指定 Binpack 策略(通过简单的装箱算法将小文件合并为大文件,开销最小,速度最快)
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.table_name',
strategy => 'binpack'
);
-- 3. 带过滤条件的合并
-- 仅重写符合特定分区或条件的数据。
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.table_name',
where => 'date >= "2024-01-01"' --date为表中实际字段名,请根据实际修改过滤条件。
);
-- 4. 自定义文件大小选项
-- 显式设置目标文件大小 (512MB) 和最小合并阈值 (128MB)。
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.table_name',
options => map(
'target-file-size-bytes', '536870912',
'min-file-size-bytes', '134217728'
)
);更多详情请参阅Iceberg Rewrite Data Files。