The Iceberg connector lets you query and write data in Apache Iceberg tables using Trino on EMR. Iceberg is an open table format for data lakes that supports ACID transactions, partition evolution, and snapshot-based time travel.
Prerequisites
Before you begin, ensure that you have:
A DataLake cluster or Hadoop cluster with the Presto service enabled. See Create a cluster.
Limitations
Only DataLake clusters and Hadoop clusters running EMR V3.38.0 or later support the Iceberg connector.
If you selected DLF Unified Metadata when creating the cluster, you cannot write data to Iceberg tables.
Configure the Iceberg connector
For the general procedure to modify connector configurations, see Configure connectors.
Default configuration
Log on to the EMR console, go to the Configure tab of the Trino service page, and click iceberg.properties. The tab shows the hive.metastore.uri configuration item, which specifies the URI of Hive Metastore accessible via the Thrift protocol. Modify this value to match your environment.
Add configuration items
On the Configure tab of the Trino service page, click iceberg.properties, then click Add Configuration Item.
| Configuration item | Description | Default |
|---|---|---|
iceberg.file-format | File format for storing Iceberg table data. Valid values: ORC, PARQUET. | ORC |
iceberg.compression-codec | Compression codec used when writing files. Valid values: GZIP, ZSTD, LZ4, SNAPPY, NONE. | GZIP |
iceberg.max-partitions-per-writer | Maximum number of partitions each writer can process. | 100 |
Query Iceberg tables
The following steps show how to create a schema and table, insert data, and query results using standard Trino SQL.
Prerequisites
Log on to your cluster in SSH mode. See Log on to a cluster.
Connect to the Trino client. See Use the CLI to connect to Trino.
Steps
Create a schema:
CREATE SCHEMA iceberg.testdb;Create a table:
CREATE TABLE iceberg.testdb.iceberg_test (id INT);Insert data:
INSERT INTO iceberg.testdb.iceberg_test VALUES (1), (2);Query the table:
SELECT * FROM iceberg.testdb.iceberg_test;Expected output:
id ---- 1 2
SQL syntax
The Iceberg connector supports reading and writing data and metadata in Iceberg tables. In addition to standard SQL, it supports the following statements:
| Statement | Reference |
|---|---|
| INSERT | INSERT in the Trino documentation |
| DELETE | Delete data by partition in this topic and DELETE in the Trino documentation |
| Schema and table management | Partition a table in this topic and Schema and table management in the Trino documentation |
| Materialized view management | Manage materialized views in this topic and Materialized view management in the Trino documentation |
| View management | View management in the Trino documentation |
Table properties
Use the WITH clause to set the following properties when creating an Iceberg table:
| Property | Description | Default |
|---|---|---|
format | File format for storing table data. Valid values: ORC, PARQUET. | ORC |
partitioning | Partition key columns as an array. For example, ARRAY['c1', 'c2']. | — |
location | URI of the file system that stores the table. | — |
Example:
CREATE TABLE test_table (
c1 INTEGER,
c2 DATE,
c3 DOUBLE)
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
location = '/var/my_tables/test_table');Partition a table
The Iceberg connector supports function-based partitioning. Use the following functions in the partitioning property:
| Function | Description |
|---|---|
year(ts) | Partitions by year. Returns the number of years between ts and January 1, 1970. |
month(ts) | Partitions by month. Returns the number of months between ts and January 1, 1970. |
day(ts) | Partitions by day. Returns the number of days between ts and January 1, 1970. |
hour(ts) | Partitions by hour. Returns a truncated timestamp with the minute and second parts removed. |
bucket(x, nbuckets) | Hash-partitions data into the specified number of buckets. Returns the hash value of x in the range [0, nbuckets - 1). |
truncate(s, nchars) | Returns the first nchars characters of s. |
Example: Partition the customer_orders table by order month, account number hash (10 buckets), and country:
CREATE TABLE iceberg.testdb.customer_orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country']);Delete data by partition
For a partitioned table, if you include a WHERE clause in a DELETE statement to filter partitions, the Iceberg connector deletes the partitions that match the filter conditions. For example, the following statement deletes all partitions where country = 'US' from the customer_orders table:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US';You can use the Iceberg connector to delete data only by partition. The following statement fails to be executed because the WHERE clause filters specific rows within partitions rather than complete partitions:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods';Query system tables
The Iceberg connector exposes system tables that provide metadata about each Iceberg table.
Query partitions — includes the minimum and maximum values for each partition key column:
SELECT * FROM iceberg.testdb."customer_orders$partitions";Query snapshots — lists all snapshots with their commit timestamps:
SELECT * FROM iceberg.testdb."customer_orders$snapshots"
ORDER BY committed_at DESC;Roll back to a snapshot
Iceberg tables support snapshots. The Iceberg connector provides a system snapshot table for each Iceberg table. Snapshot IDs are of the BIGINT data type.
To revert a table to a previous state, first get the target snapshot ID, then call the rollback procedure.
Get the latest snapshot ID:
SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1;Roll back to the snapshot:
CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****);Replace
895459706749342****with the actual snapshot ID (a BIGINT value).
Manage materialized views
A materialized view in the Iceberg connector consists of a view definition and a backing Iceberg table. The table name is stored as a materialized view property; data is stored in the Iceberg table.
| Statement | Description |
|---|---|
| CREATE MATERIALIZED VIEW | Creates a materialized view. Use the WITH clause to set Iceberg table properties such as format and partitioning. Example: WITH (format = 'ORC', partitioning = ARRAY['event_date']). |
| REFRESH MATERIALIZED VIEW | Updates the materialized view by deleting the backing table data and re-inserting the query results. You can also use this statement to delete the definition and Iceberg table of a materialized view. |
A small time window exists between the delete and insert operations during refresh. If the insert fails, the materialized view will be empty until the next successful refresh.
What's next
Overview — learn about Iceberg concepts including snapshots, partitions, and table format versions.
Configure connectors — general guide for modifying connector configurations in EMR.