すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Deltaコネクタ

最終更新日:Jan 11, 2025

Trinoサービスを使用してデプロイされたE-MapReduce(EMR)クラスターは、Deltaコネクタを提供します。 Deltaコネクタは、オープンソースのDelta Lakeのすべての機能をサポートし、既存の機能に加えて強化された機能を提供します。

背景情報

Delta Lakeは、Databricksによって開発されたデータレイクソリューションです。 Delta Lakeは、データレイクへのデータの書き込み、データの管理、データのクエリ、およびデータレイクからのデータの読み取りに使用できる機能を提供します。 詳細については、「概要」をご参照ください。

前提条件

DataLakeクラスターまたはカスタムクラスターが作成され、Trinoサービスが選択されています。 Hadoopクラスターが作成され、Prestoサービスが選択されています。 詳細については、「クラスターの作成」をご参照ください。

制限

EMR V3.39.1、EMR V5.5.0、およびそれ以降のマイナーバージョン、DataLakeクラスターとカスタムクラスターのHadoopクラスターのみがDeltaコネクタをサポートしています。

基本的な使用方法

Deltaコネクタの構成の変更

Deltaコネクタの構成を変更する方法の詳細については、「組み込みコネクタの構成の変更」をご参照ください。

Deltaコネクタのデフォルト構成

EMRコンソールのTrinoサービスの[構成] タブに移動します。 [構成] タブで、[delta.properties] タブをクリックします。 ビジネス要件に基づいて、次の表に記載されている構成項目を変更または追加します。

構成項目

説明

hive.metastore.uri

Thriftプロトコルに基づいてHiveメタストアにアクセスするために使用されるUniform Resource Identifier(URI)。 ビジネス要件に基づいて、この構成項目の値を変更できます。 デフォルトでは、この構成項目は thrift://master-1-1.cluster-24****:9083 の形式で指定されます。

hive.config.resources

Hiveメタストアで使用されるリソースファイルが格納されているパス。

Trinoを使用してDelta Lakeテーブルを作成または変更することはできません。 Spark SQLを使用してDelta Lakeテーブルを作成できます。 詳細については、「Delta Lakeの使用」をご参照ください。

  1. データの生成。

    1. 次のコマンドを実行して、Spark SQL CLIを開きます。

      spark-sql
    2. 次のステートメントを実行して、delta_tableという名前のDelta Lakeテーブルを作成します。

      CREATE TABLE delta_table (id INT) USING delta;
    3. 次のステートメントを実行して、delta_tableテーブルにデータを書き込みます。

      INSERT INTO delta_table VALUES 0,1,2,3,4;
  2. データのクエリ。

    1. Trinoコンソールに移動します。 詳細については、「コマンドを実行してTrinoコンソールにログオンする」をご参照ください。

    2. 次のステートメントを実行して、delta_tableテーブルのデータをクエリします。

      SELECT * FROM delta_table;

      次の出力が返されます。

       id
      ----
        0
        1
        2
        3
        4
      (5 rows)

高度な使用方法

重要

この機能は、EMR V3.39.1、EMR V5.5.0バージョンでのみサポートされています。

タイムトラベル

タイムトラベル機能を使用して、Delta Lakeテーブルの履歴データをクエリできます。

EMR Trinoは、Delta Lakeテーブルのタイムトラベル機能をサポートしています。 タイムトラベル機能の構文は、for xxx as of です。 xxx は、タイムトラベル機能が動作するモードを示し、VERSIONまたはTIMESTAMPに設定できます。

重要

EMR Trinoでサポートされているタイムトラベル機能の構文には、オープンソースのDelta LakeのSpark SQLの構文には含まれていないキーワード FOR が含まれています。

例:

  1. 次のコマンドを実行して、Spark SQL CLIを開きます。

    spark-sql
  2. 次のステートメントを実行して、データを上書きします。

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  3. データのクエリ。

    1. Trinoコンソールに移動します。 詳細については、「コマンドを実行してTrinoコンソールにログオンする」をご参照ください。

    2. 次のステートメントを実行して、delta_tableテーブルのデータをクエリします。

      SELECT * FROM delta_table;

      次の出力が返されます。

       id
      ----
        5
        6
        7
        8
        9
      (5 rows)
  4. タイムトラベル機能を使用して、delta_tableテーブルの履歴データをクエリします。

    次のステートメントを実行して、バージョン番号でデータをクエリします。 ビジネス要件に基づいて、次のステートメントのVERSIONパラメーターを構成できます。 バージョン番号は、単調に増加する整数です。 デフォルトでは、最初のINSERT操作後にVERSIONパラメーターの値は 1 になり、変更が実行されるたびにこのパラメーターの値は 1 ずつ増加します。

    SELECT * FROM delta_table FOR VERSION AS OF 1;

    次の出力が返されます。

     id
    ----
      2
      1
      3
      4
      0
    (5 rows)

    タイムスタンプでDelta Lakeテーブルの履歴データをクエリすることもできます。 サポートされているタイムスタンプタイプは、DATE、TIMESTAMP、およびTIMESTAMP WITH TIME ZONEです。

    • DATEタイプのタイムスタンプに基づいてデータをクエリする場合、クエリ日の協定世界時(UTC)形式でタイムスタンプが00:00:00のデータがクエリされます。

    • TIMESTAMPタイプのタイムスタンプに基づいてデータをクエリする場合、UTC形式のタイムスタンプが指定されたタイムスタンプに対応するデータがクエリされます。

      たとえば、TIMESTAMPタイプに基づいて、2022年 2 月 15 日 20:00:00 UTC+8のデータをクエリします。 サンプルコード:

      SELECT * FROM delta_table FOR TIMESTAMP AS OF TIMESTAMP '2022-02-15 12:00:00';
      説明

      サンプルコードでは、最初のTIMESTAMPはタイムトラベル機能にタイムスタンプモードが使用されていることを示し、2番目のTIMESTAMPはTIMESTAMPタイプを使用してデータがクエリされていることを示します。

      次の出力が返されます。

       id
      ----
        2
        0
        3
        4
        1
      (5 rows)
    • TIMESTAMP WITH TIME ZONEタイプのタイムスタンプに基づいてデータをクエリする場合、データ型はデータが読み取られる前に暗黙的に変換されます。

      たとえば、TIMESTAMP WITH TIME ZONEタイプに基づいて、2022年 2 月 15 日 20:00:00 UTC+8のデータをクエリします。 サンプルコード:

      SELECT * FROM delta_table FOR TIMESTAMP AS OF CAST('2022-02-15 20:00:00 +0800' AS TIMESTAMP WITH TIME ZONE);

Zオーダー

Trinoは、Zオーダーに基づいてDelta Lakeテーブルのクエリを最適化します。 Parquetに基づくデータクエリの最適化とデータスキップがサポートされています。 Parquetまたはデータスキップに基づいてデータクエリを最適化すると、Delta Lakeはファイルの粒度で各フィールドの最大値と最小値に関する統計を収集します。 統計は、データファイルをフィルタリングするために使用されます。 EMR Trinoによって提供されるDeltaコネクタを使用して、統計を読み取ることができます。

OPTIMIZEステートメントとZORDER BY句を使用して、Delta Lakeテーブルを最適化し、Zオーダーに適切な列を指定できます。 このようにして、Trinoを使用して最適化されたDelta Lakeテーブルをクエリすると、クエリ速度を最大数十倍向上させることができます。

Trinoでは、INT、LONG、DOUBLE、FLOAT、BINARY、BOOLEAN、STRING、およびARRAYのデータ型の列をZオーダーできます。

Zオーダーデータを処理するためにデータスキップ機能を構成する場合、=<<=>>= などの述語を指定できます。

説明

Trinoは、LIKEやINなどの述語をサポートしていません。 LIKEやINなどの述語をZオーダーを使用して最適化した後、Zオーダーの部分順序付け機能に基づく述語を使用してクエリを高速化できます。

たとえば、conn_zorderテーブルには、src_ip、src_port、dst_ip、dst_portの 4 つの列が含まれています。

次のステートメントを実行して、Sparkの列を最適化します。

OPTIMIZE conn_zorder ZORDER BY (src_ip, src_port, dst_ip, dst_port);
重要

括弧内の列は、列がZオーダーされるのと同じ順序で並べられています。

OPTIMIZE操作の実行にかかる時間は、データ量によって異なります。 列を最適化した後、クエリ実行条件を満たすクエリの パフォーマンスが向上します。

  • Zオーダーされた特定の列をクエリすることで、データクエリのパフォーマンスを向上させることができます。 サンプルステートメント:

    SELECT COUNT(*) FROM conn_zorder WHERE src_ip > '64.';
  • Zオーダーされた列をクエリする場合、クエリ速度が大幅に向上します。 サンプルステートメント:

    SELECT COUNT(*) FROM conn_zorder WHERE src_ip >= '64.' AND dst_ip < '192.' AND src_port < 1000 AND dst_port > 50000;