Delta Lake は、データレイク上に Lakehouse アーキテクチャを構築するオープンソースのストレージフレームワークです。 Delta Lake は、原子性、一貫性、分離性、耐久性 (ACID) トランザクション、スケーラブルなメタデータ処理を提供し、OSS、Amazon S3、Hadoop 分散ファイルシステム (HDFS) などの既存のデータレイク上でストリーム処理とバッチ処理を組み合わせます。 Delta Lake は、Spark、PrestoDB、Flink などの複数のエンジンもサポートし、Scala、Java、Rust、Python などのさまざまなプログラミング言語用の API を提供します。
前提条件
ワークスペースが作成されていること。 詳細については、「ワークスペースの作成」をご参照ください。
手順
ステップ 1: SQL セッションの作成
Sessions ページに移動します。
EMR コンソールにログインします。
左側のナビゲーションウィンドウで、 を選択します。
[Spark] ページで、管理するワークスペースの名前をクリックします。
[EMR Serverless Spark] ページの左側のナビゲーションウィンドウで、[Operation Center] > [Sessions] を選択します。
[SQL Sessions] タブで、[Create SQL Session] をクリックします。
[Create SQL Session] ページで、[Spark Configurations] セクションのパラメーターを設定し、[Create] をクリックします。 詳細については、「SQL セッションの管理」をご参照ください。
現在のワークスペースのデフォルトのカタログがメタデータに使用されます。 デフォルトのカタログを外部 Hive Metastore に変更する場合は、「外部 Hive Metastore サービスへの接続」をご参照ください。
spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
ステップ 2: Delta Lake テーブルからの読み取りと書き込み
SQL 開発ページに移動します。
[EMR Serverless Spark] ページの左側のナビゲーションウィンドウで、[Developer] をクリックします。
[Development] タブで、
アイコンをクリックします。[Create] ダイアログボックスで、users_task などの名前を入力し、タイプはデフォルトの [SparkSQL] のままにして、[OK] をクリックします。
次のコードを新しい SparkSQL タブ (users_task) にコピーします。
CREATE DATABASE IF NOT EXISTS ss_delta_db; CREATE TABLE ss_delta_db.delta_tbl (id INT, name STRING) USING delta; INSERT INTO ss_delta_db.delta_tbl VALUES (1, "a"), (2, "b"); SELECT id, name FROM ss_delta_db.delta_tbl ORDER BY id;データベースのドロップダウンリストからデータベースを選択します。 セッションのドロップダウンリストから、作成した SQL セッションを選択します。
[Run] をクリックしてタスクを実行します。 次の情報が返されます。

ステップ 3: データの更新
Delta Lake は、Update、Delete、Merge Into など、さまざまなデータ更新操作をサポートしています。 次のコードは例です。
-- 更新操作
UPDATE ss_delta_db.delta_tbl SET name = "a_v2" WHERE id = 1;
-- 削除操作
DELETE FROM ss_delta_db.delta_tbl WHERE id = 2;
-- Merge Into 操作
-- 一時テーブルを作成してデータを挿入します。
CREATE TABLE ss_delta_db.tmp_tbl(id INT, name STRING) USING delta;
INSERT INTO ss_delta_db.tmp_tbl VALUES (1, "a_v3"), (3, "c");
-- Merge Into 操作を実行します。
MERGE INTO ss_delta_db.delta_tbl AS target
USING ss_delta_db.tmp_tbl AS source
ON target.id = source.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
-- 結果を検証します。
SELECT * FROM ss_delta_db.delta_tbl ORDER BY id;次の情報が返されます。

ステップ 4: リソースのクリーンアップ
テストが完了したら、次のコマンドを実行してテストリソースをクリーンアップし、ストレージの消費を回避します。
DROP TABLE ss_delta_db.delta_tbl;
DROP TABLE ss_delta_db.tmp_tbl;
DROP DATABASE ss_delta_db;上記のコマンドは、テーブルとデータベースを完全に削除します。 データがバックアップされているか、不要になったことを確認してください。
参考資料
SQL ジョブを開発およびオーケストレーションする方法については、「SparkSQL 開発の開始」をご参照ください。
Delta Lake の使用方法と構成の詳細については、「Delta Lake 公式ドキュメント」をご参照ください。