All Products
Search
Document Center

E-MapReduce:Delta connector

Last Updated:Mar 26, 2026

The Delta connector lets you query Delta Lake tables directly from E-MapReduce (EMR) Trino. It supports all open source Delta Lake features and adds enhanced capabilities such as time travel and Z-Ordering-based query acceleration.

Background information

Delta Lake is a data lake solution developed by Databricks. It provides features for writing data to, managing, querying, and reading data from data lakes. For more information, see Overview.

Prerequisites

Before you begin, ensure that you have:

  • A DataLake cluster or Custom cluster with the Trino service enabled, or a Hadoop cluster with the Presto service enabled

To create a cluster, see Create a cluster.

Supported versions

The Delta connector is supported on:

  • Hadoop clusters running EMR V3.39.1, EMR V5.5.0, or a later minor version

  • DataLake clusters and Custom clusters (no version restriction)

Configure the Delta connector

Delta connector settings live in the delta.properties tab on the Configure tab of the Trino service in the EMR console. To change these settings, follow the instructions in Modify the configurations of a built-in connector.

Configuration itemDescription
hive.metastore.uriThe URI for accessing the Hive metastore via the Thrift protocol. Default format: thrift://master-1-1.cluster-24****:9083
hive.config.resourcesThe path to the resource file used by the Hive metastore

Query Delta Lake tables

Trino can read Delta Lake tables but cannot create or modify them. Use Spark SQL to create tables and write data. For details, see Use Delta Lake.

The following example shows how to create a table with Spark SQL and then query it from Trino.

Step 1: Create a table and insert data

  1. Open the Spark SQL CLI:

    spark-sql
  2. Create a Delta Lake table named delta_table:

    CREATE TABLE delta_table (id INT) USING delta;
  3. Insert data into the table:

    INSERT INTO delta_table VALUES 0,1,2,3,4;

Step 2: Query data from Trino

  1. Log on to the Trino console. For instructions, see Log on to the Trino console by running commands.

  2. Run the following query:

    SELECT * FROM delta_table;

    Expected output:

     id
    ----
      0
      1
      2
      3
      4
    (5 rows)

Advanced features

Important

The following features are supported only in EMR V3.39.1 and EMR V5.5.0.

Time travel

Time travel lets you query a historical snapshot of a Delta Lake table by version number or timestamp. EMR Trino uses the FOR ... AS OF syntax:

-- Query by version number
SELECT * FROM <table> FOR VERSION AS OF <version>;

-- Query by timestamp
SELECT * FROM <table> FOR TIMESTAMP AS OF <timestamp>;
Important

EMR Trino's time travel syntax includes the keyword FOR. This differs from open source Spark SQL, which does not use FOR in its time travel syntax.

Query by version

Version numbers are integers that start at 1 after the first INSERT and increment by 1 with each modification.

The following example overwrites delta_table and then queries an earlier version using time travel.

  1. Overwrite the data in Spark SQL:

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  2. Confirm the current state from Trino:

    SELECT * FROM delta_table;

    Output:

     id
    ----
      5
      6
      7
      8
      9
    (5 rows)
  3. Query the data as it was at version 1:

    SELECT * FROM delta_table FOR VERSION AS OF 1;

    Output:

     id
    ----
      2
      1
      3
      4
      0
    (5 rows)

Query by timestamp

Three timestamp types are supported: DATE, TIMESTAMP, and TIMESTAMP WITH TIME ZONE.

  • If you query data based on a timestamp of the DATE type, the data whose timestamp is 00:00:00 UTC on the query date is queried.

  • If you query data based on a timestamp of the TIMESTAMP type, the data whose timestamp in UTC corresponds to the specified timestamp is queried. For example, to query data at 20:00:00 on February 15, 2022 UTC+8 using the TIMESTAMP type:

    Note

    In the TIMESTAMP form, the first TIMESTAMP keyword specifies the time travel mode and the second TIMESTAMP is the literal type keyword.

    SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';

    The following output is returned:

     id
    ----
      2
      0
      3
      4
      1
    (5 rows)
  • If you query data based on a timestamp of the TIMESTAMP WITH TIME ZONE type, the data type is implicitly converted before the snapshot is read:

    SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);

Z-Ordering

Z-Ordering is a data layout optimization technique that co-locates related data in the same set of files. Data query optimization based on Parquet and data skipping is supported. After Z-Ordering, Delta Lake collects min/max statistics for each column at file granularity and uses them to skip irrelevant files during queries. The Delta connector reads these statistics, enabling Trino to skip data files and accelerate queries by up to dozens of times.

Optimize a table

Run the OPTIMIZE command with ZORDER BY in Spark to rewrite and sort the table data. The following example optimizes the conn_zorder table, which has columns src_ip, src_port, dst_ip, and dst_port:

OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);
Important

The column order in the ZORDER BY clause determines the Z-Ordering priority. The time to complete an OPTIMIZE operation scales with the amount of data.

Supported column types

Z-Ordering supports the following column data types: INT, LONG, DOUBLE, FLOAT, BINARY, BOOLEAN, STRING, and ARRAY.

Supported predicates for data skipping

When querying Z-Ordered data, the data skipping feature applies file-level filtering for these predicates: =, <, <=, >, and >=.

LIKE and IN predicates do not trigger full data skipping. However, queries using LIKE or IN can still benefit from partial ordering when the Z-Ordered columns narrow down the relevant file range.

Query examples

After optimization, queries that filter on Z-Ordered columns run significantly faster:

-- Filter on a single Z-Ordered column
SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';

-- Filter on multiple Z-Ordered columns
SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;

What's next