Delta Lake Change Data Capture(CDC)を使用して、増分データウェアハウスを構築できます。このトピックでは、Delta Lake CDC のパラメーター、スキーマ、および使用例について説明します。
背景情報
CDC は、データベースのテーブルにおけるデータ変更を識別およびキャプチャし、変更をダウンストリームプロセスまたはシステムに配信するために使用できるソフトウェア設計パターンのセットです。Delta Lake CDC は、Delta Lake テーブルをソースとして使用して、データ変更を取得できます。
Delta Lake CDC は、Change Data Feed(CDF)を使用して実装されます。 CDF を使用すると、Delta Lake テーブルのデータ変更をロウレベルで追跡できます。テーブルに対して CDF を有効にすると、Delta Lake は変更データを永続化し、テーブルを格納するディレクトリ内の特定のファイルにデータを書き込むことができます。 Delta Lake CDC を使用すると、増分データウェアハウスを効率的に構築できます。
制限事項
Delta Lake CDC は、EMR V3.41.0 以降のマイナーバージョン(Delta Lake 0.6.1)および EMR V5.9.0 以降のマイナーバージョン(Delta Lake 2.0)のクラスターでのみ使用できます。
パラメーター
SparkConf パラメーター
パラメーター | 説明 |
spark.sql.externalTableValuedFunctions | Spark 2.4.X のテーブル値関数を拡張するカスタム EMR Spark パラメーター。 Spark SQL を使用して CDF クエリを実行する場合は、このパラメーターを table_changes に設定します。 |
spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled | 有効な値:
説明 このパラメーターは、Delta Lake 2.X でのみ有効です。 |
CDC 書き込みパラメーター
パラメーター | 説明 |
delta.enableChangeDataFeed | CDF を有効にするかどうかを指定します。有効な値:
|
CDC 読み取りパラメーター
次の表に示すパラメーターは、DataFrame または Spark Streaming を使用して Delta Lake CDC を構築する場合にのみ必要です。
パラメーター | 説明 |
readChangeFeed | テーブルの変更データを読み取るかどうかを指定します。値 true は、テーブルの変更データが返されることを示します。このパラメーターを true に設定する場合は、startingVersion パラメーターまたは startingTimestamp パラメーターを設定する必要があります。 |
startingVersion | 変更データの読み取りを開始するテーブルのバージョンを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。 |
endingVersion | 変更データの読み取りを停止するテーブルのバージョンを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。 |
startingTimestamp | テーブルの変更データの読み取りを開始するタイムスタンプを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。 |
endingTimestamp | テーブルの変更データの読み取りを停止するタイムスタンプを指定します。このパラメーターは、readChangeFeed パラメーターを true に設定した場合にのみ有効になります。 |
スキーマ
Delta Lake CDF によって返されるスキーマには、元のテーブルのスキーマと次のフィールドが含まれます。
_change_type: データ変更の原因となった操作。有効な値:
insert: テーブルにデータを挿入します。
delete: テーブルからデータを削除します。
update_preimage および update_postimage: テーブルのデータを更新します。 update_preimage は変更前のデータを記録し、update_postimage は変更後のデータを記録します。
_commit_version: データ変更が発生した Delta Lake テーブルのバージョンを指定します。
_commit_timestamp: データ変更が発生した Delta Lake テーブルがコミットされた時刻を指定します。
例
Spark SQL を使用した Delta Lake CDC の構築
Spark SQL 構文は、EMR Spark 2 および Delta Lake 0.6.1 でのみサポートされています。
EMR Spark 2 で Spark SQL 構文を使用するには、次のパラメーターを設定する必要があります。サンプルコード:
spark-sql --conf spark.sql.externalTableValuedFunctions=table_changes
次のサンプルコードは、SQL 構文の使用方法の例を示しています。
-- Delta CDF 対応テーブルの作成
CREATE TABLE cdf_tbl (id int, name string, age int) USING delta
TBLPROPERTIES ("delta.enableChangeDataFeed" = "true");
-- 挿入
INSERT INTO cdf_tbl VALUES (1, 'XUN', 32), (2, 'JING', 30);
-- 上書き挿入
INSERT OVERWRITE TABLE cdf_tbl VALUES (1, 'a1', 30), (2, 'a2', 32), (3, "a3", 32);
-- 更新
UPDATE cdf_tbl set age = age + 1;
-- マージ
CREATE TABLE merge_source (id int, name string, age int) USING delta;
INSERT INTO merge_source VALUES (1, "a1", 31), (2, "a2_new", 33), (4, "a4", 30);
MERGE INTO cdf_tbl target USING merge_source source
ON target.id = source.id
WHEN MATCHED AND target.id % 2 == 0 THEN UPDATE SET name = source.name
WHEN MATCHED THEN DELETE
WHEN NOT MATCHED THEN INSERT *;
-- 削除
DELETE FROM cdf_tbl WHERE age >= 32;
-- CDF クエリ
-- すべてのバージョンのテーブルから変更データをクエリします。データのクエリを開始するテーブルのバージョンは 0 です。
select * from table_changes("cdf_tbl", 0);
select * from table_changes("cdf_tbl", '2023-02-03 15:33:34'); -- 2023-02-03 15:33:34 は、バージョン 0 のテーブルがコミットされた時刻です。
-- バージョン 4 のテーブルから変更データをクエリします。
select * from table_changes("cdf_tbl", 4, 4);
select * from table_changes("cdf_tbl", '2023-02-03 15:34:06', '2023-02-03 15:34:06'); -- 2023-02-03 15:34:06 は、バージョン 4 のテーブルがコミットされた時刻です。
次の図は、クエリ結果を示しています。
図 1. 最初のクエリの結果
図 2. 2 番目のクエリの結果
DataFrame を使用した Delta Lake CDC の構築
// Delta CDF 対応テーブルの作成と書き込み
// Delta Lake テーブルに初めてデータを書き込むときは、CDF を有効にする必要があります。後続のデータ書き込み操作では、CDF を有効にする必要はありません。
val df = Seq((1, "XUN", 32), (2, "JING", 30)).toDF("id", "name", "age")
df.write.format("delta").mode("append")
.option("delta.enableChangeDataFeed", "true")
.saveAsTable("cdf_table")
// DataFrame を使用した CDF クエリ
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) // endingVersion パラメーターはオプションです。
.table("cdf_table")
Spark Streaming を使用した Delta Lake CDC の構築
// データを使用したストリーミング CDF クエリ
spark.read.format("delta")
.option("readChangeFeed", "true")
.option("startingVersion", 0)
.option("endingVersion", 4) // endingVersion パラメーターはオプションです。
.table("cdf_table")