すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Delta Lake の使用

最終更新日:Nov 09, 2025

Delta Lake は、データレイク上に Lakehouse アーキテクチャを構築するオープンソースのストレージフレームワークです。 Delta Lake は、原子性、一貫性、分離性、耐久性 (ACID) トランザクション、スケーラブルなメタデータ処理を提供し、OSS、Amazon S3、Hadoop 分散ファイルシステム (HDFS) などの既存のデータレイク上でストリーム処理とバッチ処理を組み合わせます。 Delta Lake は、Spark、PrestoDB、Flink などの複数のエンジンもサポートし、Scala、Java、Rust、Python などのさまざまなプログラミング言語用の API を提供します。

前提条件

ワークスペースが作成されていること。 詳細については、「ワークスペースの作成」をご参照ください。

手順

ステップ 1: SQL セッションの作成

  1. Sessions ページに移動します。

    1. EMR コンソールにログインします。

    2. 左側のナビゲーションウィンドウで、[EMR Serverless] > [Spark] を選択します。

    3. [Spark] ページで、管理するワークスペースの名前をクリックします。

    4. [EMR Serverless Spark] ページの左側のナビゲーションウィンドウで、[Operation Center] > [Sessions] を選択します。

  2. [SQL Sessions] タブで、[Create SQL Session] をクリックします。

  3. [Create SQL Session] ページで、[Spark Configurations] セクションのパラメーターを設定し、[Create] をクリックします。 詳細については、「SQL セッションの管理」をご参照ください。

    現在のワークスペースのデフォルトのカタログがメタデータに使用されます。 デフォルトのカタログを外部 Hive Metastore に変更する場合は、「外部 Hive Metastore サービスへの接続」をご参照ください。

    spark.sql.extensions             io.delta.sql.DeltaSparkSessionExtension
    spark.sql.catalog.spark_catalog  org.apache.spark.sql.delta.catalog.DeltaCatalog

ステップ 2: Delta Lake テーブルからの読み取りと書き込み

  1. SQL 開発ページに移動します。

    [EMR Serverless Spark] ページの左側のナビゲーションウィンドウで、[Developer] をクリックします。

  2. [Development] タブで、image アイコンをクリックします。

  3. [Create] ダイアログボックスで、users_task などの名前を入力し、タイプはデフォルトの [SparkSQL] のままにして、[OK] をクリックします。

  4. 次のコードを新しい SparkSQL タブ (users_task) にコピーします。

    CREATE DATABASE IF NOT EXISTS ss_delta_db;
    
    CREATE TABLE ss_delta_db.delta_tbl (id INT, name STRING) USING delta;
    
    INSERT INTO ss_delta_db.delta_tbl VALUES (1, "a"), (2, "b");
    
    SELECT id, name FROM ss_delta_db.delta_tbl ORDER BY id;
  5. データベースのドロップダウンリストからデータベースを選択します。 セッションのドロップダウンリストから、作成した SQL セッションを選択します。

  6. [Run] をクリックしてタスクを実行します。 次の情報が返されます。

    image

ステップ 3: データの更新

Delta Lake は、Update、Delete、Merge Into など、さまざまなデータ更新操作をサポートしています。 次のコードは例です。

-- 更新操作
UPDATE ss_delta_db.delta_tbl SET name = "a_v2" WHERE id = 1;

-- 削除操作
DELETE FROM ss_delta_db.delta_tbl WHERE id = 2;


-- Merge Into 操作
-- 一時テーブルを作成してデータを挿入します。
CREATE TABLE ss_delta_db.tmp_tbl(id INT, name STRING) USING delta;
INSERT INTO ss_delta_db.tmp_tbl VALUES (1, "a_v3"), (3, "c");

-- Merge Into 操作を実行します。
MERGE INTO ss_delta_db.delta_tbl AS target
USING ss_delta_db.tmp_tbl AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

-- 結果を検証します。
SELECT * FROM ss_delta_db.delta_tbl ORDER BY id;

次の情報が返されます。

image

ステップ 4: リソースのクリーンアップ

テストが完了したら、次のコマンドを実行してテストリソースをクリーンアップし、ストレージの消費を回避します。

DROP TABLE ss_delta_db.delta_tbl;
DROP TABLE ss_delta_db.tmp_tbl;

DROP DATABASE ss_delta_db;
重要

上記のコマンドは、テーブルとデータベースを完全に削除します。 データがバックアップされているか、不要になったことを確認してください。

参考資料