Change Data Capture (CDC) mengidentifikasi dan menangkap perubahan data pada tabel database. CDC mencatat operasi tingkat baris—seperti INSERT, UPDATE, dan DELETE—ke dalam tabel Delta inkremental untuk menghasilkan aliran acara perubahan data. Aliran acara ini kemudian dapat mendukung berbagai kebutuhan bisnis, seperti komputasi inkremental, sinkronisasi data, dan pelapisan gudang data.
Fitur
Komputasi inkremental: Membaca catatan perubahan CDC secara inkremental untuk tugas-tugas seperti memperbarui Tampilan yang di-materialisasi.
Pemrosesan aliran: Membaca catatan perubahan CDC sebagai aliran untuk tugas pemrosesan, seperti konsumsi oleh Pekerjaan Flink.
Sinkronisasi data antar-mesin ganda: Menyinkronkan dan menghitung data inkremental di berbagai mesin.
Audit log: Mencatat seluruh operasi data untuk keperluan audit log.
Fitur ini saat ini berada dalam tahap pratinjau undangan. Untuk informasi lebih lanjut tentang cara menggunakan fitur ini, lihat Petunjuk.
DDL untuk pembuatan tabel
DDL pembuatan tabel tersedia dalam dua jenis: CDC sinkron dan CDC asinkron.
CDC sinkron: Menghasilkan tabel inkremental hanya melalui operasi DML SQL. Penulisan data real-time tidak didukung. Data CDC dihasilkan setelah operasi SQL selesai.
CDC asinkron: Menghasilkan tabel inkremental melalui operasi DML SQL dan mendukung penulisan data real-time. Namun, pembuatan data CDC bersifat asinkron dan tidak langsung.
Synchronous CDC
Saat membuat tabel Delta, tambahkan properti "acid.cdc.mode.enable"="true". Anda juga dapat menambahkan properti "cdc.insert.into.passthrough.enable"="true" dan "cdc.data.retain.hours"="24".
Pernyataan SQL berikut merupakan contoh:
CREATE TABLE acid_with_cdc_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true");acid.cdc.mode.enable: Mengaktifkan fitur CDC untuk tabel Delta. Ini memungkinkan pembuatan data CDC secara sinkron selama operasi DML SQL. Penulisan real-time melalui Tunnel tidak didukung.cdc.insert.into.passthrough.enable: Secara default, tabel yang diaktifkan CDC-nya tidak mendukung pernyataan `INSERT INTO`. Anda dapat menambahkan properti ini untuk menjalankan pernyataan `INSERT INTO`. Data yang ditulis hanya direpresentasikan sebagai tipe `INSERT` dalam CDC. Jika terdapat baris data dengan primary key (PK) yang sama, kueri berikutnya akan gagal karena konflik PK. Anda harus memastikan bahwa data PK bersifat unik.cdc.data.retain.hours: Periode retensi untuk data CDC, dalam satuan jam. Nilainya berkisar antara 1 hingga 168. Nilai default-nya adalah 24.
Asynchronous CDC
Saat membuat tabel Delta, tambahkan properti "acid.cdc.mode.enable"="true", "acid.cdc.build.async"="true", dan "acid.cdc.build.interval"="300".
Pernyataan SQL berikut merupakan contoh:
CREATE TABLE acid_with_cdc_build_tbl (pk BIGINT NOT NULL PRIMARY KEY, val BIGINT)
tblproperties ("transactional" = "true",
"acid.cdc.mode.enable"="true",
"acid.cdc.build.async"="true",
"acid.cdc.build.interval"="300"); acid.cdc.mode.enable: Mengaktifkan fitur CDC untuk tabel Delta. Ini memungkinkan pembuatan data CDC secara sinkron selama operasi DML SQL. Penulisan real-time melalui Tunnel tidak didukung.acid.cdc.build.async: Mengaktifkan pembuatan data CDC secara asinkron. Ini mendukung penulisan real-time ke tabel melalui Tunnel. Data CDC untuk operasi DML SQL juga dihasilkan secara asinkron.acid.cdc.build.interval: Interval pembuatan asinkron, dalam satuan detik. Nilainya harus berada dalam rentang [60, 3540]. Anda dapat mengonfigurasi parameter ini sesuai kebutuhan.Parameter opsional lainnya (tingkat Proyek atau tingkat session):
"odps.storage.orc.enable.memcmp.sort.key="true". Anda dapat mengaktifkan properti ini di tingkat Proyek. Properti ini membantu meningkatkan performa pembuatan dan kueri CDC asinkron.
Query CDC data
table_changes
Sintaksis
SELECT * FROM table_changes('<table_name>', <start> [, <end>]);Parameter
Parameter
Wajib
Deskripsi
table_nameYa
Tabel Delta yang akan dikueri.
startYa
Konstanta bertipe BIGINT atau STRING. Menentukan versi awal untuk kueri data CDC. Untuk mengetahui versi tabel, jalankan perintah
SHOW HISTORY FOR TABLE <table_name>;. Untuk informasi lebih lanjut, lihat SHOW. Jika Anda memasukkan nilai STRING, gunakan formatyyyy-mm-dd hh:mi:ss.endTidak
Konstanta bertipe BIGINT atau STRING. Menentukan versi akhir untuk kueri data CDC. Jika parameter ini tidak ditentukan, kueri akan berjalan hingga versi terbaru. Jika Anda memasukkan nilai STRING, gunakan format
yyyy-mm-dd hh:mi:ss.NILAI YANG DIKEMBALIKAN
Selain kolom data, tiga kolom sistem berikut juga dikembalikan:
__meta_timestamp: Waktu sistem saat data ditulis.__meta_op_type: Jenis operasi. Nilai 1 menunjukkan operasi INSERT, dan 0 menunjukkan operasi DELETE.__meta_is_update: Menentukan apakah operasi tersebut merupakan update. Nilai 1 berarti TRUE, dan 0 berarti FALSE.
Kolom
__meta_op_typedan__meta_is_updatedapat digabungkan untuk merepresentasikan empat kasus berikut:__meta_op_type
__meta_is_update
Deskripsi
1
0
Catatan baru dari operasi INSERT.
1
1
Nilai setelah operasi UPDATE.
0
1
Nilai sebelum operasi UPDATE.
0
0
Menunjukkan item yang dihapus.
Contoh
Buat tabel
acid_cdc_table.CREATE TABLE acid_cdc_table(id1 STRING NOT NULL, id2 STRING NOT NULL, key1 BIGINT, key2 BIGINT, PRIMARY KEY(id1, id2)) tblproperties("transactional" = "true", "acid.cdc.mode.enable"="true");Masukkan data ke dalam tabel.
-- Waktu penyisipan data 2025-04-07 11:56:57 INSERT INTO acid_cdc_table VALUES ('1', '1006', 1006, 1006); -- Waktu penyisipan data 2025-04-07 12:15:00 INSERT INTO acid_cdc_table VALUES ('1', '1008', 1008, 1008); -- Waktu penyisipan data 2025-04-07 13:24:00 INSERT INTO acid_cdc_table VALUES ('1', '1032', 1032, 1032); -- Waktu penyisipan data 2025-04-07 14:00:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045); -- Waktu penyisipan data 2025-04-07 14:47:00 INSERT INTO acid_cdc_table VALUES ('1', '1045', 1045, 1045);Kueri
versi tabel.SHOW HISTORY FOR TABLE acid_cdc_table;Hasil berikut dikembalikan:
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000001 2025-04-07 11:55:59 CREATE TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000002 2025-04-07 11:56:57 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000003 2025-04-07 12:00:13 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000004 2025-04-07 12:15:32 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000005 2025-04-07 12:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000006 2025-04-07 13:24:47 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000007 2025-04-07 13:30:02 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000008 2025-04-07 14:00:41 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000009 2025-04-07 14:15:15 MINOR_COMPACT TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000010 2025-04-07 14:47:46 APPEND TABLE a4a78d3f6af04d85a57a90deff884021 acid_cdc_table 0000000000000011 2025-04-07 15:00:11 MINOR_COMPACTKueri catatan CDC.
Kueri catatan yang dihasilkan setelah 2025-04-07 12:00:00.
SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00'); -- Setara dengan SELECT * FROM table_changes('acid_cdc_table', 3);Hasil berikut dikembalikan:
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1045 | 1045 | 1045 | 2025-04-07 14:00:34 | 1 | 0 | | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | | 2 | 1045 | 1045 | 1045 | 2025-04-07 14:47:41 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+Kueri catatan dalam rentang waktu tertentu dari
2025-04-07 12:00:00hingga13:30:00.SELECT * FROM table_changes('acid_cdc_table', '2025-04-07 12:00:00', '2025-04-07 13:30:00'); -- Setara dengan SELECT * FROM table_changes('acid_cdc_table', 3, 6);Hasil berikut dikembalikan:
+------------+------------+------------+------------+------------------+----------------+------------------+ | id1 | id2 | key1 | key2 | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------+------------+------------------+----------------+------------------+ | 1 | 1008 | 1008 | 1008 | 2025-04-07 12:15:28 | 1 | 0 | | 1 | 1032 | 1032 | 1032 | 2025-04-07 13:24:43 | 1 | 0 | +------------+------------+------------+------------+------------------+----------------+------------------+
Stream
Anda dapat menggunakan Stream dengan data CDC tabel Delta. Untuk informasi lebih lanjut, lihat Objek Stream. Bagian ini hanya menyediakan sintaks dasar dan contoh.
Sintaksis
CREATE STREAM [IF NOT EXISTS] <stream_name> ON TABLE <delta_table_name> <VERSION as of v> strmproperties ("read_mode"="cdc")CatatanProperti
"read_mode"menentukan mode konsumsi untuk Tabel Delta. Saat diatur ke"cdc", Stream melakukan kueri terhadap data Change Data Capture (CDC) berdasarkan rentang kueri.Contoh
-- Buat tabel sumber acid_with_cdc_stream. CREATE TABLE acid_with_cdc_stream (id1 BIGINT NOT NULL PRIMARY KEY, id2 BIGINT) tblproperties ("transactional" = "true", "acid.cdc.mode.enable"="true","cdc.insert.into.passthrough.enable"="true"); -- Masukkan data. INSERT INTO acid_with_cdc_stream VALUES (1, 1006), (2, 1008), (3, 1032); -- Buat Stream yang terkait dengan tabel acid_with_cdc_stream. CREATE STREAM delta_table_stream ON TABLE acid_with_cdc_stream version AS OF 1 strmproperties ("read_mode"="cdc"); -- Kueri objek Stream delta_table_stream. DESC STREAM delta_table_stream;Hasil berikut dikembalikan:
Name delta_table_stream Project yunqi_y**** Schema default Create Time 2024-12-03 11:13:12 Last Modified Time 2024-12-03 11:13:12 Offset Version 1 Reference Table Project yunqi_y**** Reference Table Schema default Reference Table Name acid_with_cdc_stream Reference Table Id b89ec113f50944d5b8e52ce6a00c**** Reference Table Version 2 Parameters {"read_mode": "cdc"}