This document describes how to use Spark for Iceberg table maintenance tasks, such as removing orphan files, expiring snapshots, and rewriting data files. These operations reduce storage costs and improve query performance.
Prerequisites
Version requirements
Spark 4.7.0 or later.
Complete the required catalog authorization:
Download dependencies
Configure dependencies
Serverless Spark
Upload the four dependency JAR packages to Object Storage Service (OSS) and add the following Spark configurations to your session.
spark.emr.serverless.excludedModules: Must beiceberg.spark.emr.serverless.user.defined.jars: The OSS path to your dependency JARs.spark.sql.extensions: Set it toorg.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions.
spark.emr.serverless.excludedModules iceberg
spark.emr.serverless.user.defined.jars oss://${bucket}/iceberg-spark-runtime-3.5_2.12-1.10.1.jar,oss://${bucket}/iceberg-aws-bundle-1.10.1.jar,oss://${bucket}/bennett-iceberg-plugin-2.8.0.jar,oss://${bucket}/guava-31.1-jre.jar
spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsUse Spark procedures
Iceberg provides several Spark procedures for table maintenance. You can run these procedures in Spark SQL with the CALL statement.
When you use a DLF catalog (for example, dlfCatalogName) in the compute engine, enable the native Iceberg mode: dlfCatalogName_iceberg.
To use a native Iceberg client, suffix your catalog name with _iceberg. This enables you to call Iceberg stored procedures, such as remove_orphan_files and expire_snapshots.
Remove orphan files
Orphan files are physical files no longer referenced by any Iceberg table metadata. Regularly removing orphan files frees up storage space.
In the following code, replace the value of the table parameter with your actual database and table name. Unless otherwise noted, no other values need to be replaced.
-- 1. Dry run
-- Lists files to be deleted without actually deleting them.
CALL dlfCatalogName_iceberg.system.remove_orphan_files(
table => 'db.table_name',
prefix_listing => true, -- Use the prefix listing feature. This significantly improves scan performance on object storage services like OSS and S3.
dry_run => true -- Dries run the deletion and returns a list of files that would be removed.
);
-- 2. Default cleanup
-- Deletes orphan files older than three days.
CALL dlfCatalogName_iceberg.system.remove_orphan_files(
table => 'db.table_name',
prefix_listing => true
);
-- 3. Set time and concurrency
-- Deletes orphan files older than the specified timestamp and enables parallel deletion.
CALL dlfCatalogName_iceberg.system.remove_orphan_files(
table => 'db.table_name',
older_than => TIMESTAMP '2024-01-01 00:00:00', -- Replace with the desired timestamp.
dry_run => false,
max_concurrent_deletes => 4, -- Number of concurrent deletion threads.
prefix_listing => true
);Learn more about the Spark procedure remove-orphan-files in Iceberg documentation.
Remove expire snapshots
Iceberg uses snapshots to track the entire history of table changes. Over time, an excessive number of snapshots can bloat metadata and consume significant storage space. This procedure removes old, unnecessary snapshots and their associated data files.
-- 1. Time-based cleanup
-- Deletes snapshots older than the specified timestamp.
CALL dlfCatalogName_iceberg.system.expire_snapshots(
table => 'db.table_name',
older_than => TIMESTAMP '2024-01-01 00:00:00' -- Replace with the desired timestamp.
);
-- 2. Retain the latest snapshots
-- Retains the N most recent snapshots, regardless of their age.
CALL dlfCatalogName_iceberg.system.expire_snapshots(
table => 'db.table_name',
retain_last => 5
);
-- 3. Delete specific snapshots
-- Deletes only the snapshots specified in the ID list.
CALL dlfCatalogName_iceberg.system.expire_snapshots(
table => 'db.table_name',
snapshot_ids => ARRAY(123456789, 987654321) --Replace with the desired snapshot IDs.
);Learn more about the Spark procedure expire_snapshots in Iceberg documentation.
Compact data files
Frequent, small batch writes can create a large number of small files, which severely impacts read performance. This procedure rewrites data files by compacting them into larger files of a target size, optimizing query efficiency.
-- 1. Default rewrite
-- Rewrites files by using the default binpack strategy.
CALL dlfCatalogName_iceberg.system.rewrite_data_files('db.table_name');
-- 2. Specify the Binpack strategy (Compacts small files into larger ones with a simple bin-packing algorithm, which offers the lowest overhead and fastest performance.)
CALL dlfCatalogName_iceberg.system.rewrite_data_files(
table => 'db.table_name',
strategy => 'binpack'
);
-- 3. Rewrite with a filter condition
-- Rewrites only the data that matches a specific partition or condition.
CALL dlfCatalogName_iceberg.system.rewrite_data_files(
table => 'db.table_name',
where => 'date >= "2024-01-01"' --'date' is an example column name in the table. Modify the filter condition as needed.
);
-- 4. Custom file size options
-- Explicitly sets the target file size (512 MB) and the minimum input file size threshold for rewriting (128 MB).
CALL dlfCatalogName_iceberg.system.rewrite_data_files(
table => 'db.table_name',
options => map(
'target-file-size-bytes', '536870912',
'min-file-size-bytes', '134217728'
)
);Learn more about the Spark procedure rewrite_data_files in Iceberg documentation.