All Products
Search
Document Center

E-MapReduce:Iceberg connector

Last Updated:Mar 25, 2026

The Iceberg connector lets you query and write data in Apache Iceberg tables using Trino on EMR. Iceberg is an open table format for data lakes that supports ACID transactions, partition evolution, and snapshot-based time travel.

Prerequisites

Before you begin, ensure that you have:

  • A DataLake cluster or Hadoop cluster with the Presto service enabled. See Create a cluster.

Limitations

  • Only DataLake clusters and Hadoop clusters running EMR V3.38.0 or later support the Iceberg connector.

  • If you selected DLF Unified Metadata when creating the cluster, you cannot write data to Iceberg tables.

Configure the Iceberg connector

For the general procedure to modify connector configurations, see Configure connectors.

Default configuration

Log on to the EMR console, go to the Configure tab of the Trino service page, and click iceberg.properties. The tab shows the hive.metastore.uri configuration item, which specifies the URI of Hive Metastore accessible via the Thrift protocol. Modify this value to match your environment.

Add configuration items

On the Configure tab of the Trino service page, click iceberg.properties, then click Add Configuration Item.

Configuration itemDescriptionDefault
iceberg.file-formatFile format for storing Iceberg table data. Valid values: ORC, PARQUET.ORC
iceberg.compression-codecCompression codec used when writing files. Valid values: GZIP, ZSTD, LZ4, SNAPPY, NONE.GZIP
iceberg.max-partitions-per-writerMaximum number of partitions each writer can process.100

Query Iceberg tables

The following steps show how to create a schema and table, insert data, and query results using standard Trino SQL.

Prerequisites

Steps

  1. Create a schema:

    CREATE SCHEMA iceberg.testdb;
  2. Create a table:

    CREATE TABLE iceberg.testdb.iceberg_test (id INT);
  3. Insert data:

    INSERT INTO iceberg.testdb.iceberg_test VALUES (1), (2);
  4. Query the table:

    SELECT * FROM iceberg.testdb.iceberg_test;

    Expected output:

     id
    ----
     1
     2

SQL syntax

The Iceberg connector supports reading and writing data and metadata in Iceberg tables. In addition to standard SQL, it supports the following statements:

StatementReference
INSERTINSERT in the Trino documentation
DELETEDelete data by partition in this topic and DELETE in the Trino documentation
Schema and table managementPartition a table in this topic and Schema and table management in the Trino documentation
Materialized view managementManage materialized views in this topic and Materialized view management in the Trino documentation
View managementView management in the Trino documentation

Table properties

Use the WITH clause to set the following properties when creating an Iceberg table:

PropertyDescriptionDefault
formatFile format for storing table data. Valid values: ORC, PARQUET.ORC
partitioningPartition key columns as an array. For example, ARRAY['c1', 'c2'].
locationURI of the file system that stores the table.

Example:

CREATE TABLE test_table (
    c1 INTEGER,
    c2 DATE,
    c3 DOUBLE)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['c1', 'c2'],
    location = '/var/my_tables/test_table');

Partition a table

The Iceberg connector supports function-based partitioning. Use the following functions in the partitioning property:

FunctionDescription
year(ts)Partitions by year. Returns the number of years between ts and January 1, 1970.
month(ts)Partitions by month. Returns the number of months between ts and January 1, 1970.
day(ts)Partitions by day. Returns the number of days between ts and January 1, 1970.
hour(ts)Partitions by hour. Returns a truncated timestamp with the minute and second parts removed.
bucket(x, nbuckets)Hash-partitions data into the specified number of buckets. Returns the hash value of x in the range [0, nbuckets - 1).
truncate(s, nchars)Returns the first nchars characters of s.

Example: Partition the customer_orders table by order month, account number hash (10 buckets), and country:

CREATE TABLE iceberg.testdb.customer_orders (
    order_id BIGINT,
    order_date DATE,
    account_number BIGINT,
    customer VARCHAR,
    country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country']);

Delete data by partition

For a partitioned table, if you include a WHERE clause in a DELETE statement to filter partitions, the Iceberg connector deletes the partitions that match the filter conditions. For example, the following statement deletes all partitions where country = 'US' from the customer_orders table:

DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US';

You can use the Iceberg connector to delete data only by partition. The following statement fails to be executed because the WHERE clause filters specific rows within partitions rather than complete partitions:

DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods';

Query system tables

The Iceberg connector exposes system tables that provide metadata about each Iceberg table.

Query partitions — includes the minimum and maximum values for each partition key column:

SELECT * FROM iceberg.testdb."customer_orders$partitions";

Query snapshots — lists all snapshots with their commit timestamps:

SELECT * FROM iceberg.testdb."customer_orders$snapshots"
ORDER BY committed_at DESC;

Roll back to a snapshot

Iceberg tables support snapshots. The Iceberg connector provides a system snapshot table for each Iceberg table. Snapshot IDs are of the BIGINT data type.

To revert a table to a previous state, first get the target snapshot ID, then call the rollback procedure.

  1. Get the latest snapshot ID:

    SELECT snapshot_id
    FROM iceberg.testdb."customer_orders$snapshots"
    ORDER BY committed_at DESC
    LIMIT 1;
  2. Roll back to the snapshot:

    CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****);

    Replace 895459706749342**** with the actual snapshot ID (a BIGINT value).

Manage materialized views

A materialized view in the Iceberg connector consists of a view definition and a backing Iceberg table. The table name is stored as a materialized view property; data is stored in the Iceberg table.

StatementDescription
CREATE MATERIALIZED VIEWCreates a materialized view. Use the WITH clause to set Iceberg table properties such as format and partitioning. Example: WITH (format = 'ORC', partitioning = ARRAY['event_date']).
REFRESH MATERIALIZED VIEWUpdates the materialized view by deleting the backing table data and re-inserting the query results. You can also use this statement to delete the definition and Iceberg table of a materialized view.
Important

A small time window exists between the delete and insert operations during refresh. If the insert fails, the materialized view will be empty until the next successful refresh.

What's next

  • Overview — learn about Iceberg concepts including snapshots, partitions, and table format versions.

  • Configure connectors — general guide for modifying connector configurations in EMR.