変更データキャプチャ (CDC) は、データベーステーブル内のデータの変更を識別してキャプチャします。挿入、更新、削除などの行レベルの操作を増分 Delta テーブルに記録し、データ変更イベントストリームを作成します。このイベントストリームは、増分コンピューティング、データ同期、データウェアハウスの階層化などのビジネス要件をサポートします。
機能
増分コンピューティング:マテリアライズドビューの更新などのタスクのために、CDC 変更レコードを増分的に読み取ります。
ストリーム処理:Flink ジョブによる消費などの処理タスクのために、CDC 変更レコードをストリームとして読み取ります。
複数エンジン間のデータ同期:異なるエンジン間で増分データを同期し、計算します。
ログ監査:すべてのデータ操作をログ監査のために記録します。
この機能は現在、招待プレビュー段階です。この機能の使用方法の詳細については、「手順」をご参照ください。
テーブル作成のための DDL
テーブル作成の DDL には、同期 CDC と非同期 CDC の 2 種類があります。
同期 CDC:SQL DML 操作のみを使用して増分テーブルを生成します。リアルタイムデータ書き込みはサポートされていません。CDC データは SQL 操作の完了後に生成されます。
非同期 CDC:SQL DML 操作を使用して増分テーブルを生成し、リアルタイムデータ書き込みをサポートします。ただし、CDC データの生成は非同期であり、即時ではありません。
同期 CDC
Delta テーブルを作成するときに、"acid.cdc.mode.enable"="true" プロパティを追加します。また、"cdc.insert.into.passthrough.enable"="true" および "cdc.data.retain.hours"="24" プロパティを追加することもできます。
SQL 文の例を次に示します。
CREATE TABLE acid_with_cdc_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");acid.cdc.mode.enable:Delta テーブルの CDC 機能を有効にします。これにより、SQL DML 操作中に CDC データを同期的に生成できます。Tunnel を介したリアルタイム書き込みはサポートされていません。cdc.insert.into.passthrough.enable:デフォルトでは、CDC が有効なテーブルは `INSERT INTO` 文をサポートしていません。このプロパティを追加することで `INSERT INTO` 文を実行できます。書き込まれたデータは、CDC 内では `INSERT` 型としてのみ表現されます。同じプライマリキー (PK) を持つデータ行が存在する場合、後続のクエリは PK の競合により失敗します。PK データが一意であることを確認する必要があります。cdc.data.retain.hours: CDC データの保持期間 (時間単位)。値の範囲は 1 から 168 です。デフォルト値は 24 です。
非同期 CDC
Delta テーブルを作成するときに、"acid.cdc.mode.enable"="true"、"acid.cdc.build.async"="true"、および "acid.cdc.build.interval"="300" プロパティを追加します。
SQL 文の例を次に示します。
CREATE TABLE acid_with_cdc_build_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true",
"acid.cdc.mode.enable"="true",
"acid.cdc.build.async"="true",
"acid.cdc.build.interval"="300"); acid.cdc.mode.enable:Delta テーブルの CDC 機能を有効にします。これにより、SQL DML 操作中に CDC データを同期的に生成できます。Tunnel を介したリアルタイム書き込みはサポートされていません。acid.cdc.build.async:CDC データの非同期ビルドを有効にします。これにより、Tunnel を介したテーブルへのリアルタイム書き込みがサポートされます。SQL DML 操作の CDC データも非同期で生成されます。acid.cdc.build.interval:非同期ビルドの間隔 (秒単位) です。値は [60, 3540] の範囲内である必要があります。このパラメーターは必要に応じて設定できます。その他のオプションパラメーター (プロジェクトレベルまたはセッションレベル):
"odps.storage.orc.enable.memcmp.sort.key="true"。このプロパティはプロジェクトレベルで有効にできます。非同期 CDC のビルドとクエリのパフォーマンス向上に役立ちます。
CDC データのクエリ
table_changes
構文
SELECT * FROM table_changes('<table_name>', <start> [, <end>]);パラメーター
パラメーター
必須
説明
table_nameはい
クエリ対象の Delta テーブル。
startはい
BIGINT 型または STRING 型の定数。CDC データクエリの開始バージョンを指定します。テーブルのバージョンを確認するには、
SHOW HISTORY FOR TABLE <table_name>;コマンドを実行します。詳細については、「SHOW」をご参照ください。STRING を入力する場合は、yyyy-mm-dd hh:mi:ssフォーマットを使用します。endいいえ
BIGINT 型または STRING 型の定数。CDC データクエリの終了バージョンを指定します。このパラメーターを指定しない場合、クエリは最新バージョンまで実行されます。STRING を入力する場合は、
yyyy-mm-dd hh:mi:ssフォーマットを使用します。戻り値
データ列に加えて、次の 3 つのシステム列が返されます。
__meta_timestamp: データが書き込まれたシステム時刻。__meta_op_type:操作タイプ。値 1 は INSERT 操作を示し、0 は DELETE 操作を示します。__meta_is_update:操作が更新であるかどうかを指定します。値 1 は TRUE を示し、0 は FALSE を示します。
__meta_op_typeと__meta_is_update列を組み合わせることで、次の 4 つのケースを表すことができます。__meta_op_type
__meta_is_update
説明
1
0
INSERT 操作による新しいレコード。
1
1
UPDATE 操作後の値。
0
1
UPDATE 操作前の値。
0
0
削除された項目を示します。
例
acid_cdc_tableテーブルを作成します。CREATE TABLE acid_cdc_table(id1 STRING NOT NULL, id2 STRING NOT NULL, key1 BIGINT, key2 BIGINT, PRIMARY KEY(id1, id2)) tblproperties("transactional" = "true", "acid.cdc.mode.enable"="true");テーブルにデータを挿入します。
-- データ挿入時刻 2025-04-07 11:56:57 INSERT INTO acid_cdc_table VALUES ('1', '1006', 1006, 1006); -- データ挿入時刻 2025-04-07 12:15:00 INSERT INTO acid_cdc_table VALUES ('1', '1008', 1008, 1008); -- データ挿入時刻 2025-04-07 13:24:00 INSERT INTO acid_cdc_table VALUES ('1', '1032', 1032, 1032); -- データ挿入時刻 2025-04-07 14:00:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045); -- データ挿入時刻 2025-04-07 14:47:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);テーブルバージョンをクエリします。SHOW HISTORY FOR TABLE acid_cdc_table;次の結果が返されます。
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000001 2025-04-07 11:55:59 CREATE TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000002 2025-04-07 11:56:57 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000003 2025-04-07 12:00:13 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000004 2025-04-07 12:15:32 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000005 2025-04-07 12:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000006 2025-04-07 13:24:47 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000007 2025-04-07 13:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000008 2025-04-07 14:00:41 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000009 2025-04-07 14:15:15 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000010 2025-04-07 14:47:46 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000011 2025-04-07 15:00:11 MINOR_COMPACTCDC レコードをクエリします。
2025-04-07 12:00:00 以降に生成されたレコードをクエリします。
SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00'); -- 以下と同等 SELECT * FROM table_changes('acid_cdc_table', 3);次の結果が返されます。
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1045 | 1045 | 1045 | 2025-04-07 14:00:34 | 1 | 0 | | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | | 2 | 1045 | 1045 | 1045 | 2025-04-07 14:47:41 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+2025-04-07 12:00:00から13:30:00までの指定された間隔内のレコードをクエリします。SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00', '2025-04-07 13:30:00'); -- 以下と同等 SELECT * FROM table_changes('acid_cdc_table', 3, 6);次の結果が返されます。
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+
ストリーム
Delta テーブルの CDC データでストリームを使用できます。詳細については、「ストリームオブジェクト」をご参照ください。このセクションでは、基本的な構文と例のみを提供します。
構文
CREATE STREAM [IF NOT EXISTS] <stream_name> ON TABLE <delta_table_name> <VERSION as of v> strmproperties ("read_mode"="cdc")説明"read_mode"プロパティは、Delta テーブルの消費モードを指定します。"cdc"に設定すると、ストリームはクエリ範囲に基づいて変更データキャプチャ (CDC) データをクエリします。例
-- ソーステーブル acid_with_cdc_stream を作成します。 CREATE TABLE acid_with_cdc_stream (id1 BIGINT NOT NULL PRIMARY KEY, id2 BIGINT) tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true","cdc.insert.into.passthrough.enable"="true"); -- データを挿入します。 INSERT INTO acid_with_cdc_stream VALUES (1, 1006), (2, 1008), (3, 1032); -- acid_with_cdc_stream テーブルに関連付けられたストリームを作成します。 CREATE STREAM delta_table_stream ON TABLE acid_with_cdc_stream version AS OF 1 strmproperties ("read_mode"="cdc"); -- ストリームオブジェクト delta_table_stream をクエリします。 DESC STREAM delta_table_stream;次の結果が返されます:
Name delta_table_stream Project yunqi_y**** Schema default Create Time 2024-12-03 11:13:12 Last Modified Time 2024-12-03 11:13:12 Offset Version 1 Reference Table Project yunqi_y**** Reference Table Schema default Reference Table Name acid_with_cdc_stream Reference Table Id b89ec113f50944d5b8e52ce6a00c**** Reference Table Version 2 Parameters {"read_mode": "cdc"}