Iceberg is an open table format for data lakes. You can use the Iceberg connector to query data files in the Iceberg format.

Background information

For more information about Iceberg, see Overview.

Prerequisites

A data lake cluster or Hadoop cluster is created, and the Presto service is selected. For more information, see Create a cluster.

Limits

Only data lake clusters and Hadoop clusters of E-MapReduce (EMR) V3.38.0 and later versions support the Iceberg connector.

Configure the Iceberg connector

Modify the configurations of the Iceberg connector. For more information, see Configure connectors.

Default configurations of the Iceberg connector

Go to the Presto service page in the EMR console, click the Configure tab, and then click the iceberg.properties tab. The hive.metastore.uri configuration item is displayed on the iceberg.properties tab. This configuration item specifies the uniform resource identifier (URI) of the Hive metastore service that you can access based on a Thrift protocol. Modify this configuration item based on your business requirements.

Configure the Iceberg connector

Go to the Presto service page in the EMR console, click the Configure tab, and then click the iceberg.properties tab. Click Add Configuration Item in the upper-left corner to add the configuration items that are described in the following table.

Configuration item Description
iceberg.file-format The file format in which data of Iceberg tables is stored. Valid values:
  • ORC: This is the default value.
  • PARQUET
iceberg.compression-codec The compression format that is used when files are written. Valid values:
  • GZIP: This is the default value.
  • ZSTD
  • LZ4
  • SNAPPY
  • NONE
iceberg.max-partitions-per-writer The maximum number of partitions whose data can be processed by each writer. Default value: 100.

Example: Query data in an Iceberg table

You can use the basic SQL syntax of Presto to query data in an Iceberg table.

  1. Log on to the master node of your cluster in SSH mode. For more information, see Log on to a cluster.
  2. Connect to the Presto client. For more information, see Access Presto by running commands.
  3. Run the following command to create a table named iceberg_test:
    create table iceberg_test(id int);
  4. Run the following command to insert data into the iceberg_test table:
    insert into iceberg_test values(1),(2);
  5. Run the following command to query data in the iceberg_test table:
    select * from iceberg_test;
    The following information is returned:
     id
    ----
     1
     2

SQL statements

The Iceberg connector can be used to read data from or write data to an Iceberg table and read or write table metadata. In addition to basic SQL statements, the Iceberg connector also supports the SQL statements that are described in the following table.
SQL statement Description
INSERT For more information, see INSERT in the official Presto documentation.
DELETE For more information, see the Delete data by partition section of this topic.

For more information, see DELETE in the official Presto documentation.

Schema and table management For more information, see the Partition a table section of this topic.

For more information, see Schema and table management in the official Presto documentation.

Materialized views management For more information, see the Manage materialized views section of this topic.

For more information, see Materialized views management in the official Presto documentation.

Views management For more information, see Views management in the official Presto documentation.

Partition a table

The Iceberg connector can use the functions that are described in the following table to partition a table.
Function Description
year(ts) Partitions a table by year. This function returns the difference in years between the value of ts and January 1, 1970.
month(ts) Partitions a table by month. This function returns the difference in months between the value of ts and January 1, 1970.
day(ts) Partitions a table by day. This function returns the difference in days between the value of ts and January 1, 1970.
hour(ts) Partitions a table by hour. This function returns a timestamp based on the value of ts. The minute and second parts of the value of ts are ignored.
bucket(x, nbuckets) Performs hash partitioning on data and allocates data to a specified number of buckets. This function returns the integral hash value of x. The integral hash value of x is within the range of [0, nbuckets - 1).
truncate(s, nchars) Returns the first nchars characters of s.
Example: Partition a table named customer_orders based on the month that is included in the value of order_date, the hash value of account_number (number of buckets: 10), and country.
CREATE TABLE iceberg.testdb.customer_orders (
    order_id BIGINT,
    order_date DATE,
    account_number BIGINT,
    customer VARCHAR,
    country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country'])

Delete data by partition

For a partitioned table, if you include a WHERE clause in a DELETE statement to filter partitions, the Iceberg connector deletes the partitions that meet filter conditions in the partitioned table. For example, execute the following statement to delete all partitions that meet the filter condition of country=US from the customer_orders table:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US'
You can use the Iceberg connector to delete data only by partition. For example, the following statement fails to be executed because the WHERE clause in the statement is used to filter specific rows in partitions:
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods'

Roll back to a snapshot

Snapshots are supported for Iceberg tables.

The Iceberg connector provides a system snapshot table for each Iceberg table. Snapshots of each Iceberg table are identified by snapshot IDs of the BIGINT data type. For example, you can execute the following statement to query the latest snapshot IDs of the customer_orders table:
SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1
You can execute the following statement to roll back the state of the table to a snapshot based on the specified snapshot ID:
CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****)

Query partitions of a system table

You can use the Iceberg connector to query partitions of a system table. For example, you can execute the following statement to query partitions of the customer_orders table. The information about partitions includes the maximum value and minimum value of each partition key column.
SELECT * FROM iceberg.testdb."customer_orders$partitions"

Properties of Iceberg tables

The following table describes the properties of Iceberg tables.
Property Description
format Specifies the file format in which data of Iceberg tables is stored. Valid values:
  • ORC: This is the default value.
  • PARQUET
partitioning Specifies partition key columns.

For example, if a table contains the partition key columns c1 and c2, this property is set to ARRAY['c1', 'c2'].

location Specifies the URI of the file system that stores tables.
In the following sample statement, the format property is set to PARQUET, the partitioning property is set to ARRAY['c1', 'c2'], and the location property is set to /var/my_tables/test_table:
CREATE TABLE test_table (
    c1 integer,
    c2 date,
    c3 double)
WITH (
    format = 'PARQUET',
    partitioning = ARRAY['c1', 'c2'],
    location = '/var/my_tables/test_table')

Manage materialized views

The Iceberg connector supports materialized views. Each materialized view consists of a view definition and an Iceberg table. The table name is stored as a materialized view property, and data is stored in the Iceberg table.

The following table describes the statements that can be executed on materialized views.
Statement Description
CREATE MATERIALIZED VIEW Creates a materialized view.
You can use the properties of the Iceberg table to determine the storage format of the Iceberg table. For example, set the format property to ORC and the partitioning property to ARRAY['event_date'] in the WITH clause to store data of the Iceberg table in ORC files and partition the table by day.
WITH ( format = 'ORC', partitioning = ARRAY['event_date'] )
REFRESH MATERIALIZED VIEW Updates data in a materialized view.
After you execute the statement, the data of the Iceberg table is deleted, and then the execution results of a query defined by the materialized view are inserted into the materialized view.
Important A small-sized time window exists between the delete and insert operations. When the materialized view is empty, the materialized view remains empty if the insert operation fails.

You can also execute the statement to delete the definition and Iceberg table of a materialized view.