The Delta connector lets you query Delta Lake tables directly from E-MapReduce (EMR) Trino. It supports all open source Delta Lake features and adds enhanced capabilities such as time travel and Z-Ordering-based query acceleration.
Background information
Delta Lake is a data lake solution developed by Databricks. It provides features for writing data to, managing, querying, and reading data from data lakes. For more information, see Overview.
Prerequisites
Before you begin, ensure that you have:
A DataLake cluster or Custom cluster with the Trino service enabled, or a Hadoop cluster with the Presto service enabled
To create a cluster, see Create a cluster.
Supported versions
The Delta connector is supported on:
Hadoop clusters running EMR V3.39.1, EMR V5.5.0, or a later minor version
DataLake clusters and Custom clusters (no version restriction)
Configure the Delta connector
Delta connector settings live in the delta.properties tab on the Configure tab of the Trino service in the EMR console. To change these settings, follow the instructions in Modify the configurations of a built-in connector.
| Configuration item | Description |
|---|---|
hive.metastore.uri | The URI for accessing the Hive metastore via the Thrift protocol. Default format: thrift://master-1-1.cluster-24****:9083 |
hive.config.resources | The path to the resource file used by the Hive metastore |
Query Delta Lake tables
Trino can read Delta Lake tables but cannot create or modify them. Use Spark SQL to create tables and write data. For details, see Use Delta Lake.
The following example shows how to create a table with Spark SQL and then query it from Trino.
Step 1: Create a table and insert data
Open the Spark SQL CLI:
spark-sqlCreate a Delta Lake table named
delta_table:CREATE TABLE delta_table (id INT) USING delta;Insert data into the table:
INSERT INTO delta_table VALUES 0,1,2,3,4;
Step 2: Query data from Trino
Log on to the Trino console. For instructions, see Log on to the Trino console by running commands.
Run the following query:
SELECT * FROM delta_table;Expected output:
id ---- 0 1 2 3 4 (5 rows)
Advanced features
The following features are supported only in EMR V3.39.1 and EMR V5.5.0.
Time travel
Time travel lets you query a historical snapshot of a Delta Lake table by version number or timestamp. EMR Trino uses the FOR ... AS OF syntax:
-- Query by version number
SELECT * FROM <table> FOR VERSION AS OF <version>;
-- Query by timestamp
SELECT * FROM <table> FOR TIMESTAMP AS OF <timestamp>;EMR Trino's time travel syntax includes the keyword FOR. This differs from open source Spark SQL, which does not use FOR in its time travel syntax.
Query by version
Version numbers are integers that start at 1 after the first INSERT and increment by 1 with each modification.
The following example overwrites delta_table and then queries an earlier version using time travel.
Overwrite the data in Spark SQL:
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;Confirm the current state from Trino:
SELECT * FROM delta_table;Output:
id ---- 5 6 7 8 9 (5 rows)Query the data as it was at version 1:
SELECT * FROM delta_table FOR VERSION AS OF 1;Output:
id ---- 2 1 3 4 0 (5 rows)
Query by timestamp
Three timestamp types are supported: DATE, TIMESTAMP, and TIMESTAMP WITH TIME ZONE.
If you query data based on a timestamp of the DATE type, the data whose timestamp is 00:00:00 UTC on the query date is queried.
If you query data based on a timestamp of the TIMESTAMP type, the data whose timestamp in UTC corresponds to the specified timestamp is queried. For example, to query data at 20:00:00 on February 15, 2022 UTC+8 using the TIMESTAMP type:
NoteIn the TIMESTAMP form, the first
TIMESTAMPkeyword specifies the time travel mode and the secondTIMESTAMPis the literal type keyword.SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';The following output is returned:
id ---- 2 0 3 4 1 (5 rows)If you query data based on a timestamp of the TIMESTAMP WITH TIME ZONE type, the data type is implicitly converted before the snapshot is read:
SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);
Z-Ordering
Z-Ordering is a data layout optimization technique that co-locates related data in the same set of files. Data query optimization based on Parquet and data skipping is supported. After Z-Ordering, Delta Lake collects min/max statistics for each column at file granularity and uses them to skip irrelevant files during queries. The Delta connector reads these statistics, enabling Trino to skip data files and accelerate queries by up to dozens of times.
Optimize a table
Run the OPTIMIZE command with ZORDER BY in Spark to rewrite and sort the table data. The following example optimizes the conn_zorder table, which has columns src_ip, src_port, dst_ip, and dst_port:
OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);The column order in the ZORDER BY clause determines the Z-Ordering priority. The time to complete an OPTIMIZE operation scales with the amount of data.
Supported column types
Z-Ordering supports the following column data types: INT, LONG, DOUBLE, FLOAT, BINARY, BOOLEAN, STRING, and ARRAY.
Supported predicates for data skipping
When querying Z-Ordered data, the data skipping feature applies file-level filtering for these predicates: =, <, <=, >, and >=.
LIKE and IN predicates do not trigger full data skipping. However, queries using LIKE or IN can still benefit from partial ordering when the Z-Ordered columns narrow down the relevant file range.
Query examples
After optimization, queries that filter on Z-Ordered columns run significantly faster:
-- Filter on a single Z-Ordered column
SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';
-- Filter on multiple Z-Ordered columns
SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;What's next
Use Delta Lake — create and manage Delta Lake tables with Spark SQL
Log on to the Trino console by running commands — connect to Trino to run queries
Modify the configurations of a built-in connector — customize connector settings