すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Delta Lake CDC を使用した増分データウェアハウスの構築

最終更新日:Jan 11, 2025

Delta Lake Change Data Capture(CDC)を使用して、増分データウェアハウスを構築できます。このトピックでは、Delta Lake CDC のパラメーター、スキーマ、および使用例について説明します。

背景情報

CDC は、データベースのテーブルにおけるデータ変更を識別およびキャプチャし、変更をダウンストリームプロセスまたはシステムに配信するために使用できるソフトウェア設計パターンのセットです。Delta Lake CDC は、Delta Lake テーブルをソースとして使用して、データ変更を取得できます。

Delta Lake CDC は、Change Data Feed(CDF)を使用して実装されます。 CDF を使用すると、Delta Lake テーブルのデータ変更をロウレベルで追跡できます。テーブルに対して CDF を有効にすると、Delta Lake は変更データを永続化し、テーブルを格納するディレクトリ内の特定のファイルにデータを書き込むことができます。 Delta Lake CDC を使用すると、増分データウェアハウスを効率的に構築できます。

制限事項

Delta Lake CDC は、EMR V3.41.0 以降のマイナーバージョン(Delta Lake 0.6.1)および EMR V5.9.0 以降のマイナーバージョン(Delta Lake 2.0)のクラスターでのみ使用できます。

パラメーター

SparkConf パラメーター

パラメーター

説明

spark.sql.externalTableValuedFunctions

Spark 2.4.X のテーブル値関数を拡張するカスタム EMR Spark パラメーター。 Spark SQL を使用して CDF クエリを実行する場合は、このパラメーターを table_changes に設定します。

spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled

有効な値:

  • false: CDF クエリの開始タイムスタンプまたは終了タイムスタンプは有効である必要があります。そうでない場合は、エラーが返されます。これはデフォルト値です。

  • true: 開始タイムスタンプが無効な場合は、空のデータが返されます。終了タイムスタンプが無効な場合は、現在のスナップショットのデータが返されます。

説明

このパラメーターは、Delta Lake 2.X でのみ有効です。

CDC 書き込みパラメーター

パラメーター

説明

delta.enableChangeDataFeed

CDF を有効にするかどうかを指定します。有効な値:

  • false: CDF を無効にします。これはデフォルト値です。

  • true: CDF を有効にします。

CDC 読み取りパラメーター

次の表に示すパラメーターは、DataFrame または Spark Streaming を使用して Delta Lake CDC を構築する場合にのみ必要です。

パラメーター

説明

readChangeFeed

テーブルの変更データを読み取るかどうかを指定します。値 true は、テーブルの変更データが返されることを示します。このパラメーターを true に設定する場合は、startingVersion パラメーターまたは startingTimestamp パラメーターを設定する必要があります。

startingVersion

変更データの読み取りを開始するテーブルのバージョンを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。

endingVersion

変更データの読み取りを停止するテーブルのバージョンを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。

startingTimestamp

テーブルの変更データの読み取りを開始するタイムスタンプを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。

endingTimestamp

テーブルの変更データの読み取りを停止するタイムスタンプを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。

スキーマ

Delta Lake CDF によって返されるスキーマには、元のテーブルのスキーマと次のフィールドが含まれます。

  • _change_type: データ変更の原因となった操作。有効な値:

    • insert: テーブルにデータを挿入します。

    • delete: テーブルからデータを削除します。

    • update_preimage および update_postimage: テーブルのデータを更新します。 update_preimage は変更前のデータを記録し、update_postimage は変更後のデータを記録します。

  • _commit_version: データ変更が発生した Delta Lake テーブルのバージョンを指定します。

  • _commit_timestamp: データ変更が発生した Delta Lake テーブルがコミットされた時刻を指定します。

Spark SQL を使用した Delta Lake CDC の構築

重要

Spark SQL 構文は、EMR Spark 2 および Delta Lake 0.6.1 でのみサポートされています。

EMR Spark 2 で Spark SQL 構文を使用するには、次のパラメーターを設定する必要があります。サンプルコード:

spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes

次のサンプルコードは、SQL 構文の使用方法の例を示しています。

-- Delta CDF 対応テーブルの作成
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");

-- 挿入
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);

-- 上書き挿入
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);

-- 更新
UPDATE cdf_tbl set age = age + 1;

-- マージ
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);

MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;

-- 削除
DELETE FROM cdf_tbl WHERE age >= 32;

-- CDF クエリ
-- すべてのバージョンのテーブルから変更データをクエリします。データのクエリを開始するテーブルのバージョンは 0 です。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); -- 2023-02-03 15:33:34 は、バージョン 0 のテーブルがコミットされた時刻です。


-- バージョン 4 のテーブルから変更データをクエリします。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); -- 2023-02-03 15:34:06 は、バージョン 4 のテーブルがコミットされた時刻です。

次の図は、クエリ結果を示しています。

図 1. 最初のクエリの結果fig1

図 2. 2 番目のクエリの結果fig2

DataFrame を使用した Delta Lake CDC の構築

// Delta CDF 対応テーブルの作成と書き込み
// Delta Lake テーブルに初めてデータを書き込むときは、CDF を有効にする必要があります。後続のデータ書き込み操作では、CDF を有効にする必要はありません。
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
  .option("delta.enableChangeDataFeed", "true")
  .saveAsTable("cdf_table")

// DataFrame を使用した CDF クエリ
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) // endingVersion パラメーターはオプションです。
  .table("cdf_table")

Spark Streaming を使用した Delta Lake CDC の構築

// データを使用したストリーミング CDF クエリ
spark.read.format("delta")
  .option("readChangeFeed", "true")
  .option("startingVersion", 0)
  .option("endingVersion", 4) // endingVersion パラメーターはオプションです。
  .table("cdf_table")