This topic describes how to use Spark to perform storage administration tasks. You can remove orphan files, expire old snapshots, and merge small files to reduce storage costs and improve query performance.
Prerequisites
Grant catalog permissions as needed:
Access DLF from EMR on ECS Spark
Download required dependencies
Dependency configuration
Serverless Spark
Upload the four dependency packages to Object Storage Service (OSS). Then, add the extra Spark configurations to the session.
spark.emr.serverless.excludedModules: Set the value toiceberg.spark.emr.serverless.user.defined.jars: The path where the JAR packages are stored.
spark.emr.serverless.excludedModules iceberg
spark.emr.serverless.user.defined.jars oss://${bucket}/iceberg-spark-runtime-3.5_2.12-1.10.1.jar,oss://${bucket}/iceberg-aws-bundle-1.10.1.jar,oss://${bucket}/bennett-iceberg-plugin-2.8.0.jar,oss://${bucket}/guava-31.1-jre.jarSpark procedures
Iceberg provides several Spark stored procedures for table maintenance. You can run these operations directly in Spark SQL using the `CALL` syntax.
Remove orphan files
Orphan files are physical files that are no longer referenced by Iceberg table metadata. These files are often created during table deletion operations or from failed Extract, Transform, and Load (ETL) tasks. Regularly removing orphan files is a key step to free up storage space.
Replace the value of the table parameter with the actual database and table name. Do not replace other parameters unless otherwise specified.
-- 1. Dry run mode
-- Lists the files to be deleted without actually deleting them. Use this to confirm the scope of the cleanup.
CALL iceberg_catalog.system.remove_orphan_files(
table => 'db.table_name',
prefix_listing => true, -- Uses the prefix list feature of object storage. This significantly improves scan performance on object storage services like OSS or S3.
dry_run => true -- Simulates the execution and returns a list of files to be deleted.
);
-- 2. Default cleanup
-- Deletes orphan files created more than three days ago.
CALL iceberg_catalog.system.remove_orphan_files(
table => 'db.table_name',
prefix_listing => true
);
-- 3. Specify time and concurrency
-- Deletes orphan files created before a specific timestamp and enables parallel deletion.
CALL iceberg_catalog.system.remove_orphan_files(
table => 'db.table_name',
older_than => TIMESTAMP '2024-01-01 00:00:00', -- Replace with a specific time.
dry_run => false,
max_concurrent_deletes => 4, -- The number of concurrent threads for file deletion operations.
prefix_listing => true
);For more information, see Iceberg Remove Orphan Files.
Expire snapshots
Iceberg uses snapshots to track the entire change history of a table. Over time, an excessive number of snapshots can lead to metadata bloat and consume significant storage space. This procedure removes old, unneeded snapshots and their associated data files.
-- 1. Clean up based on time
-- Deletes snapshots older than a specific timestamp.
CALL iceberg_catalog.system.expire_snapshots(
table => 'db.table_name',
older_than => TIMESTAMP '2024-01-01 00:00:00' -- Replace with a specific time.
);
-- 2. Retain the latest snapshots
-- Always retains the last N snapshots, regardless of their age.
CALL iceberg_catalog.system.expire_snapshots(
table => 'db.table_name',
retain_last => 5
);
-- 3. Delete specific snapshots
-- Deletes only the snapshots specified in the ID list.
CALL iceberg_catalog.system.expire_snapshots(
table => 'db.table_name',
snapshot_ids => ARRAY(123456789, 987654321) -- Replace with specific IDs.
);For more information, see Iceberg Expire Snapshots.
Rewrite data files
Frequent small batch writes create many small files, which can severely impact read performance. This procedure optimizes query efficiency by rewriting data files. It merges small files into larger files to meet a target size.
-- 1. Default merge
-- Merges files using the default binpack strategy.
CALL iceberg_catalog.system.rewrite_data_files('db.table_name');
-- 2. Specify the binpack strategy (merges small files into large files using a simple bin-packing algorithm, which has the lowest overhead and is the fastest)
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.table_name',
strategy => 'binpack'
);
-- 3. Merge with a filter condition
-- Rewrites only the data that matches a specific partition or condition.
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.table_name',
where => 'date >= "2024-01-01"' -- 'date' is an actual field name in the table. Modify the filter condition as needed.
);
-- 4. Custom file size options
-- Explicitly sets the target file size (512 MB) and the minimum merge threshold (128 MB).
CALL iceberg_catalog.system.rewrite_data_files(
table => 'db.table_name',
options => map(
'target-file-size-bytes', '536870912',
'min-file-size-bytes', '134217728'
)
);For more information, see Iceberg Rewrite Data Files.