すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:基本的な使い方

最終更新日:Jan 11, 2025

このトピックでは、E-MapReduce(EMR)における Delta Lake の構成情報について説明し、一般的なコマンドの例をいくつか示します。

構成情報

EMR における Delta Lake のデフォルト構成:

  • Spark 2.X

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
  • Spark 3.X

    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog

一般的なサンプルコマンド

  • テーブルを作成する

    CREATE TABLE delta_table (id INT) USING delta;
  • テーブルにデータを挿入する

    INSERT INTO delta_table VALUES 0,1,2,3,4;
  • テーブルの既存のデータを上書きする

    INSERT OVERWRITE TABLE delta_table VALUES 5,6,7,8,9;
  • データをクエリする

    SELECT * FROM delta_table;
  • データを更新する

    偶数であるすべての ID に 100 を加算します。

    UPDATE delta_table SET id = id + 100 WHERE mod(id, 2) = 0;
  • データを削除する

    ID が偶数であるレコードを削除します。

    DELETE FROM delta_table WHERE mod(id, 2) = 0;

データのマージ

  1. マージ操作のソーステーブルを作成します。

    CREATE TABLE newData(id INT) USING delta;
  2. テーブルにデータを挿入します。

    INSERT INTO newData VALUES 0,1,2,3,4,5,6,7,8,9;
  3. newData テーブルのデータを delta_table テーブルにマージします。 newData テーブルのレコードの ID が delta_table テーブルのレコードの ID と同じである場合、ソーステーブルのレコードの ID に 100 が加算されます。そうでない場合は、レコードが直接挿入されます。

    MERGE INTO delta_table AS target
      USING newData AS source
      ON target.id = source.id
      WHEN MATCHED THEN UPDATE SET target.id = source.id + 100
      WHEN NOT MATCHED THEN INSERT *;
  4. ストリーミングモードで Delta テーブルからデータを読み取ります。

    1. ストリーミングモードでデータを書き込む先のテーブルを作成します。

      CREATE TABLE stream_debug_table (id INT) USING delta;
    2. ストリームを作成します。

      CREATE SCAN stream_delta_table on delta_table USING STREAM;
      説明

      この例では、delta_table は既存の Delta テーブルです。

    3. ストリーミングモードでデータをデスティネーションテーブルに書き込みます。

      CREATE STREAM job
      options (
        triggerType='ProcessingTime',
        checkpointLocation = '/tmp/streaming_read_cp'
      )
      INSERT INTO stream_debug_table
      SELECT * FROM stream_delta_table;

  1. SSH モードでクラスターにログオンします。詳細については、「クラスターへのログオン」をご参照ください。

  2. 次のコマンドを実行して、streaming-sql を起動します。

    streaming-sql
    説明

    Delta Lake サービスがクラスターに追加されている場合は、streaming-sql コマンドを直接実行できます。サービスがクラスターに追加されていない場合は、次のコマンドを実行して Delta Lake を使用できます。

    streaming-sql --jars /path/to/delta-core_2.11-0.6.1.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension
  3. 次のコマンドを実行して、デスティネーションテーブルを作成します。

    CREATE TABLE stream_debug_table (id INT) USING DELTA;
  4. 次のコマンドを実行して、ストリームを作成します。

    CREATE SCAN stream_delta_table on delta_table USING STREAM;
  5. 次のコマンドを実行して、delta_table という名前のソーステーブルのデータをストリーミングモードでデスティネーションテーブルに書き込みます。

    CREATE STREAM job
    options (
      triggerType='ProcessingTime',
      checkpointLocation = '/tmp/streaming_read_cp'
    )
    INSERT INTO stream_debug_table
    SELECT * FROM stream_delta_table;
  6. 別の streaming-sql クライアントを起動し、次の操作を実行して、ソーステーブルにデータを挿入し、デスティネーションテーブルのデータをクエリします。

    1. 次のコマンドを実行して、デスティネーションテーブルの既存のデータをクエリします。

      SELECT * FROM stream_debug_table;
    2. 次のコマンドを実行して、ソーステーブルにデータを挿入します。

      INSERT INTO delta_table VALUES 801, 802;
    3. 次のコマンドを実行して、ソーステーブルに挿入されたデータがストリーミングモードでデスティネーションテーブルに書き込まれたかどうかを確認します。

      SELECT * FROM stream_debug_table;
    4. 次のコマンドを実行して、ソーステーブルにデータを挿入します。

      INSERT INTO delta_table VALUES 901, 902;
    5. 次のコマンドを実行して、ソーステーブルに挿入されたデータがストリーミングモードでデスティネーションテーブルに書き込まれたかどうかを確認します。

      SELECT * FROM stream_debug_table;