Iceberg is an open table format for data lakes. You can use the Iceberg connector to query data files in the Iceberg format.
Background information
For more information about Iceberg, see Overview.
Prerequisites
A data lake cluster or Hadoop cluster is created, and the Presto service is selected. For more information, see Create a cluster.
Limits
Only data lake clusters and Hadoop clusters of E-MapReduce (EMR) V3.38.0 and later versions support the Iceberg connector.
Configure the Iceberg connector
Modify the configurations of the Iceberg connector. For more information, see Configure connectors.
Default configurations of the Iceberg connector
Go to the Presto service page in the EMR console, click the Configure tab, and then click the iceberg.properties tab. The hive.metastore.uri configuration item is displayed on the iceberg.properties tab. This configuration item specifies the uniform resource identifier (URI) of the Hive metastore service that you can access based on a Thrift protocol. Modify this configuration item based on your business requirements.
Configure the Iceberg connector
Go to the Presto service page in the EMR console, click the Configure tab, and then click the iceberg.properties tab. Click Add Configuration Item in the upper-left corner to add the configuration items that are described in the following table.
Configuration item | Description |
---|---|
iceberg.file-format | The file format in which data of Iceberg tables is stored. Valid values:
|
iceberg.compression-codec | The compression format that is used when files are written. Valid values:
|
iceberg.max-partitions-per-writer | The maximum number of partitions whose data can be processed by each writer. Default value: 100. |
Example: Query data in an Iceberg table
You can use the basic SQL syntax of Presto to query data in an Iceberg table.
SQL statements
SQL statement | Description |
---|---|
INSERT | For more information, see INSERT in the official Presto documentation. |
DELETE | For more information, see the Delete data by partition section of this topic.
For more information, see DELETE in the official Presto documentation. |
Schema and table management | For more information, see the Partition a table section of this topic.
For more information, see Schema and table management in the official Presto documentation. |
Materialized views management | For more information, see the Manage materialized views section of this topic.
For more information, see Materialized views management in the official Presto documentation. |
Views management | For more information, see Views management in the official Presto documentation. |
Partition a table
Function | Description |
---|---|
year(ts) | Partitions a table by year. This function returns the difference in years between the value of ts and January 1, 1970. |
month(ts) | Partitions a table by month. This function returns the difference in months between the value of ts and January 1, 1970. |
day(ts) | Partitions a table by day. This function returns the difference in days between the value of ts and January 1, 1970. |
hour(ts) | Partitions a table by hour. This function returns a timestamp based on the value of ts. The minute and second parts of the value of ts are ignored. |
bucket(x, nbuckets) | Performs hash partitioning on data and allocates data to a specified number of buckets. This function returns the integral hash value of x. The integral hash value of x is within the range of [0, nbuckets - 1). |
truncate(s, nchars) | Returns the first nchars characters of s. |
CREATE TABLE iceberg.testdb.customer_orders (
order_id BIGINT,
order_date DATE,
account_number BIGINT,
customer VARCHAR,
country VARCHAR)
WITH (partitioning = ARRAY['month(order_date)', 'bucket(account_number, 10)', 'country'])
Delete data by partition
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US'
DELETE FROM iceberg.testdb.customer_orders
WHERE country = 'US' AND customer = 'Freds Foods'
Roll back to a snapshot
Snapshots are supported for Iceberg tables.
SELECT snapshot_id FROM iceberg.testdb."customer_orders$snapshots" ORDER BY committed_at DESC LIMIT 1
CALL iceberg.system.rollback_to_snapshot('testdb', 'customer_orders', 895459706749342****)
Query partitions of a system table
SELECT * FROM iceberg.testdb."customer_orders$partitions"
Properties of Iceberg tables
Property | Description |
---|---|
format | Specifies the file format in which data of Iceberg tables is stored. Valid values:
|
partitioning | Specifies partition key columns.
For example, if a table contains the partition key columns c1 and c2, this property is set to ARRAY['c1', 'c2']. |
location | Specifies the URI of the file system that stores tables. |
CREATE TABLE test_table (
c1 integer,
c2 date,
c3 double)
WITH (
format = 'PARQUET',
partitioning = ARRAY['c1', 'c2'],
location = '/var/my_tables/test_table')
Manage materialized views
The Iceberg connector supports materialized views. Each materialized view consists of a view definition and an Iceberg table. The table name is stored as a materialized view property, and data is stored in the Iceberg table.
Statement | Description |
---|---|
CREATE MATERIALIZED VIEW | Creates a materialized view.
You can use the properties of the Iceberg table to determine the storage format of
the Iceberg table. For example, set the format property to ORC and the partitioning
property to ARRAY['event_date'] in the WITH clause to store data of the Iceberg table
in ORC files and partition the table by day.
|
REFRESH MATERIALIZED VIEW | Updates data in a materialized view.
After you execute the statement, the data of the Iceberg table is deleted, and then
the execution results of a query defined by the materialized view are inserted into
the materialized view.
Important A small-sized time window exists between the delete and insert operations. When the
materialized view is empty, the materialized view remains empty if the insert operation
fails.
You can also execute the statement to delete the definition and Iceberg table of a materialized view. |