Delta コネクタを使用すると、E-MapReduce (EMR) Trino から Delta Lake テーブルを直接クエリできます。このコネクタは、オープンソースの Delta Lake のすべての特徴をサポートし、タイムトラベルや Z オーダリングに基づくクエリ高速化などの拡張機能も追加されています。
背景情報
Delta Lake は、Databricks 社が開発したデータレイクソリューションです。データレイクへのデータの書き込み、管理、クエリ、読み取りのための特徴を提供します。詳細については、「概要」をご参照ください。
前提条件
開始する前に、以下が準備できていることを確認してください。
Trino サービスが有効になっている DataLake クラスターまたはカスタムクラスター、あるいは Presto サービスが有効になっている Hadoop クラスター
クラスターを作成するには、「クラスターの作成」をご参照ください。
サポートされるバージョン
Delta コネクタは、以下でサポートされています。
EMR V3.39.1、EMR V5.5.0、またはそれ以降のマイナーバージョンを実行している Hadoop クラスター
DataLake クラスターおよびカスタムクラスター (バージョン制限なし)
Delta コネクタの設定
Delta コネクタの設定は、EMR コンソールの Trino サービスの [設定] タブにある [delta.properties] タブにあります。これらの設定を変更するには、「組み込みコネクタの設定変更」の手順に従ってください。
| 設定項目 | 説明 |
|---|---|
hive.metastore.uri | Thrift プロトコルを介して Hive メタストアにアクセスするための URI です。デフォルトのフォーマット: thrift://master-1-1.cluster-24****:9083 |
hive.config.resources | Hive メタストアが使用するリソースファイルへのパスです。 |
Delta Lake テーブルのクエリ
Trino は Delta Lake テーブルの読み取りはできますが、作成や変更はできません。テーブルの作成とデータの書き込みには Spark SQL を使用します。詳細については、「Delta Lake の使用」をご参照ください。
以下の例では、Spark SQL を使用してテーブルを作成し、その後 Trino からクエリを実行する方法を示します。
ステップ 1:テーブルの作成とデータ挿入
Spark SQL CLI を開きます:
spark-sqldelta_tableという名前の Delta Lake テーブルを作成します:CREATE TABLE delta_table (id INT) USING delta;テーブルにデータを挿入します:
INSERT INTO delta_table VALUES 0,1,2,3,4;
ステップ 2:Trino からのデータクエリ
Trino コンソールにログインします。手順については、「コマンド実行による Trino コンソールへのログイン」をご参照ください。
次のクエリを実行します:
SELECT * FROM delta_table;想定される出力:
id ---- 0 1 2 3 4 (5 rows)
高度な機能
以下の機能は、EMR V3.39.1 および EMR V5.5.0 でのみサポートされています。
タイムトラベル
タイムトラベル機能を使用すると、バージョン番号またはタイムスタンプによって Delta Lake テーブルの過去のスナップショットをクエリできます。EMR Trino では、FOR ... AS OF 構文を使用します:
-- バージョン番号によるクエリ
SELECT * FROM <table> FOR VERSION AS OF <version>;
-- タイムスタンプによるクエリ
SELECT * FROM <table> FOR TIMESTAMP AS OF <timestamp>;EMR Trino のタイムトラベル構文にはキーワード FOR が含まれます。これは、タイムトラベル構文で FOR を使用しないオープンソースの Spark SQL とは異なります。
バージョンによるクエリ
バージョン番号は整数で、最初の INSERT 後に 1 から始まり、変更のたびに 1 ずつ増分します。
以下の例では、delta_table を上書きした後、タイムトラベルを使用して以前のバージョンをクエリします。
Spark SQL でデータを上書きします:
INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;Trino から現在の状態を確認します:
SELECT * FROM delta_table;出力:
id ---- 5 6 7 8 9 (5 rows)バージョン 1 時点のデータをクエリします:
SELECT * FROM delta_table FOR VERSION AS OF 1;出力:
id ---- 2 1 3 4 0 (5 rows)
タイムスタンプによるクエリ
DATE、TIMESTAMP、TIMESTAMP WITH TIME ZONE の 3 種類のタイムスタンプ型がサポートされています。
DATE 型のタイムスタンプに基づいてデータをクエリする場合、クエリ日の 00:00:00 (UTC) のタイムスタンプのデータがクエリされます。
TIMESTAMP 型のタイムスタンプに基づいてデータをクエリする場合、指定されたタイムスタンプに対応する UTC のタイムスタンプのデータがクエリされます。たとえば、TIMESTAMP 型を使用して 2022 年 2 月 15 日 20:00:00 (UTC+08:00) のデータをクエリするには、次のようにします:
説明TIMESTAMP 形式では、最初の
TIMESTAMPキーワードはタイムトラベルモードを指定し、2 番目のTIMESTAMPはリテラル型のキーワードです。SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';次の出力が返されます:
id ---- 2 0 3 4 1 (5 rows)TIMESTAMP WITH TIME ZONE 型のタイムスタンプに基づいてデータをクエリする場合、スナップショットが読み取られる前にデータ型が暗黙的に変換されます:
SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);
Z オーダリング
Z オーダリングは、関連データを同じファイルセット内にまとめて配置するデータレイアウトの最適化手法です。Parquet に基づくデータクエリの最適化とデータスキップがサポートされています。Z オーダリング後、Delta Lake はファイル粒度で各列の min/max 統計を収集し、クエリ中に無関係なファイルをスキップするために使用します。Delta コネクタはこれらの統計を読み取るため、Trino はデータファイルをスキップしてクエリを最大で数十倍高速化できます。
テーブルの最適化
Spark で ZORDER BY を指定して OPTIMIZE コマンドを実行し、テーブルデータを書き換えてソートします。 次の例では、conn_zorder テーブルを最適化します。このテーブルには、src_ip、src_port、dst_ip、および dst_port の列があります。
OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);ZORDER BY 句の列の順序が Z オーダリングの優先度を決定します。OPTIMIZE 操作の完了にかかる時間は、データ量に比例します。
サポートされる列の型
Z オーダリングは、INT、LONG、DOUBLE、FLOAT、BINARY、BOOLEAN、STRING、および ARRAY の列データ型をサポートしています。
データスキップでサポートされる述語
Z オーダリングされたデータをクエリする際、データスキップ機能は、=、<、<=、>、および >= の述語に対してファイルレベルのフィルタリングを適用します。
LIKE および IN 述語は、完全なデータスキップをトリガーしません。ただし、LIKE または IN を使用するクエリでも、Z オーダリングされた列が関連するファイルの範囲を絞り込むことで、部分的な順序付けの恩恵を受けることができます。
クエリ例
最適化後、Z オーダリングされた列でフィルタリングするクエリは、大幅に高速に実行されます:
-- Z オーダリングされた単一の列でフィルタリング
SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';
-- Z オーダリングされた複数の列でフィルタリング
SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;次のステップ
Delta Lake の使用 — Spark SQL を使用した Delta Lake テーブルの作成と管理
コマンド実行による Trino コンソールへのログイン — Trino に接続してクエリを実行
組み込みコネクタの設定変更 — コネクタ設定のカスタマイズ