All Products
Search
Document Center

E-MapReduce:Delta connector

Last Updated:Jun 14, 2023

An E-MapReduce (EMR) cluster that is deployed with the Trino service provides a Delta connector. The Delta connector supports all features of open source Delta Lake and provides enhanced features on top of the existing features.

Background information

Delta Lake is a data lake solution that is developed by Databricks. Delta Lake provides features that can be used to write data to data lakes, manage data, query data, and read data from data lakes. For more information, see Overview.

Prerequisites

A DataLake cluster or Custom cluster is created, and the Trino service is selected. A Hadoop cluster is created, and the Presto service is selected. For more information, see Create a cluster.

Limits

Only Hadoop cluster of EMR V3.39.1, EMR V5.5.0, and minor later versions, DataLake and Custom clusters support a Delta connector.

Basic usage

Modify the configurations of a Delta connector

For more information about how to modify the configurations of a Delta connector, see Modify the configurations of a built-in connector.

Default configurations of a Delta connector

Go to the Configure tab of the Trino service in the EMR console. On the Configure tab, click the delta.properties tab. Modify or add the configuration items that are described in the following table based on your business requirements.

Configuration item

Description

hive.metastore.uri

The uniform resource identifier (URI) that is used to access the Hive metastore based on the Thrift protocol. You can change the value of this configuration item based on your business requirements. By default, this configuration item is specified in the format of thrift://master-1-1.cluster-24****:9083.

hive.config.resources

The path that stores the resource file used by the Hive metastore.

Example

You cannot use Trino to create or modify a Delta Lake table. You can use Spark SQL to create a Delta Lake table. For more information, see Use Delta Lake.

  1. Generate data.

    1. Run the following command to open the Spark SQL CLI:

      spark-sql
    2. Execute the following statement to create a Delta Lake table named delta_table:

      CREATE TABLE delta_table (id INT) USING delta;
    3. Execute the following statement to write data to the delta_table table:

      INSERT INTO delta_table VALUES 0,1,2,3,4;
  2. Query data.

    1. Go to the Trino console. For more information, see Log on to the Trino console by running commands.

    2. Execute the following statement to query data in the delta_table table:

      SELECT * FROM delta_table;

      The following output is returned:

       id
      ----
        0
        1
        2
        3
        4
      (5 rows)

Advanced usage

Important

This feature is supported only in EMR V3.39.1, EMR V5.5.0 versions.

Time travel

You can use the time travel feature to query historical data in a Delta Lake table.

EMR Trino supports the time travel feature of Delta Lake tables. The syntax of the time travel feature is for xxx as of. xxx indicates the mode in which the time travel feature works, and can be set to VERSION or TIMESTAMP.

Important

The syntax of the time travel feature that is supported by EMR Trino contains the keyword FOR, which is not contained in the syntax of the Spark SQL of open source Delta Lake.

Example:

  1. Run the following command to open the Spark SQL CLI:

    spark-sql
  2. Execute the following statement to overwrite data:

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  3. Query data.

    1. Go to the Trino console. For more information, see Log on to the Trino console by running commands.

    2. Execute the following statement to query data in the delta_table table:

      SELECT * FROM delta_table;

      The following output is returned:

       id
      ----
        5
        6
        7
        8
        9
      (5 rows)
  4. Use the time travel feature to query historical data in the delta_table table.

    Execute the following statement to query data by version number. You can configure the VERSION parameter in the following statement based on your business requirements. The version number is an integer that monotonically increases. By default, the value of the VERSION parameter is 1 after the first INSERT operation, and the value of this parameter is increased by 1 every time a modification is performed.

    SELECT * FROM delta_table FOR VERSION AS OF 1;

    The following output is returned:

     id
    ----
      2
      1
      3
      4
      0
    (5 rows)

    You can also query historical data in a Delta Lake table by timestamp. The following timestamp types are supported: DATE, TIMESTAMP, and TIMESTAMP WITH TIME ZONE.

    • If you query data based on a timestamp of the DATE type, the data whose timestamp is 00:00:00 in the Coordinated Universal Time (UTC) format on the query date is queried.

    • If you query data based on a timestamp of the TIMESTAMP type, the data whose timestamp in the UTC format corresponds to the specified timestamp is queried.

      For example, you query data at 20:00:00 on February 15, 2022 UTC+8 based on the TIMESTAMP type. Sample code:

      SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';
      Note

      In the sample code, the first TIMESTAMP indicates that the timestamp mode is used for the time travel feature, and the second TIMESTAMP indicates that the TIMESTAMP type is used to query data.

      The following output is returned:

       id
      ----
        2
        0
        3
        4
        1
      (5 rows)
    • If you query data based on a timestamp of the TIMESTAMP WITH TIME ZONE type, the data type is implicitly converted before data is read.

      For example, you query data at 20:00:00 on February 15, 2022 UTC+8 based on the TIMESTAMP WITH TIME ZONE type. Sample code:

      SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);

Z-Ordering

Trino optimizes queries of Delta Lake tables based on Z-Ordering. Data query optimization based on Parquet and data skipping is supported. After you optimize data queries based on Parquet or data skipping, Delta Lake collects statistics on the maximum and minimum values of each field at the file granularity. The statistics are used to filter data files. You can use the Delta connector provided by EMR Trino to read the statistics.

You can use the OPTIMIZE statement and ZORDER BY clause to optimize Delta Lake tables and specify proper columns for Z-Ordering. This way, when you use Trino to query the optimized Delta Lake tables, the query speed can be increased by up to dozens of times.

Trino allows you to Z-Order columns of the following data types: INT, LONG, DOUBLE, FLOAT, BINARY, BOOLEAN, STRING, and ARRAY.

When you configure the data skipping feature to process Z-Ordered data, you can specify the following predicates: =, <, <=, >, and >=.

Note

Trino does not support predicates such as LIKE and IN. After you optimize predicates such as LIKE and IN by using Z-Ordering, you can use the predicates based on the partial ordering capability of Z-Ordering to accelerate queries.

For example, the conn_zorder table contains four columns: src_ip, src_port, dst_ip, and dst_port.

Execute the following statement to optimize columns in Spark:

OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);
Important

The columns in the parentheses are sequenced in the same order as the columns are Z-Ordered.

The time that is taken to perform the OPTIMIZE operation varies based on the amount of data. After you optimize the columns, the performance of queries that meet the query execution conditions is improved.

  • You can improve the data query performance by querying specific columns that are Z-Ordered. Sample statement:

    SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';
  • When you query columns that are Z-Ordered, the query speed significantly increases. Sample statement:

    SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;