Stream adalah objek MaxCompute yang secara otomatis mengelola versi data untuk kueri inkremental pada Delta Table. Stream ini melacak perubahan bahasa manipulasi data (DML)—berupa INSERT, UPDATE, dan DELETE—beserta metadata untuk setiap perubahan tersebut, serta menyediakannya untuk konsumsi inkremental. Setiap stream mempertahankan penunjuk versi sehingga konsumen selalu mengetahui perubahan mana yang telah diproses dan mana yang baru.
Topik ini mencakup perintah SQL untuk membuat, memeriksa, memodifikasi, mendaftar, menghapus, dan melakukan kueri terhadap stream.
Cara kerja
Sebuah stream selalu dikaitkan dengan tepat satu Delta Table. Secara internal, stream tersebut mempertahankan dua penanda versi:
-
Offset Version: versi data hingga perubahan yang telah dikonsumsi. Penanda ini hanya maju ketika Anda membaca stream dalam operasi DML.
-
Reference Table Version: versi data terbaru dari Delta Table terkait. Versi ini diperbarui secara otomatis seiring perubahan pada tabel.
Setiap kali Anda melakukan kueri terhadap stream, MaxCompute mengembalikan perubahan inkremental dalam interval setengah terbuka (Offset Version, Reference Table Version].
Membaca tanpa mengonsumsi: Menjalankan pernyataan SELECT saja tidak akan memajukan Offset Version. Perubahan tersebut terlihat tetapi tidak ditandai sebagai telah dikonsumsi—Anda dapat membacanya kembali sebanyak yang diperlukan.
Mengonsumsi: Ketika Anda menggunakan stream dalam pernyataan DML (misalnya, INSERT INTO ... SELECT ... FROM <stream_name>), Offset Version dimajukan agar sesuai dengan Reference Table Version. Setelah dikonsumsi, stream akan mengembalikan hasil kosong hingga perubahan baru tiba.
Pilih mode baca
Tetapkan read_mode saat membuat stream untuk mengontrol data apa yang dikembalikan oleh stream tersebut.
| Mode | Data yang dikembalikan | Paling cocok untuk |
|---|---|---|
append |
Status akhir setiap baris yang berubah; baris yang dihapus tidak disertakan | Pipeline ETL standar yang hanya memproses data yang disisipkan atau diperbarui |
cdc |
Semua status perubahan (sebelum dan sesudah update, insert, delete), ditambah tiga kolom sistem | Sistem downstream yang membutuhkan riwayat perubahan lengkap, seperti sinkronisasi real-time atau pipeline audit |
Buat stream
CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <TIMESTAMP AS OF t | VERSION AS OF v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];
| Parameter | Wajib | Deskripsi |
|---|---|---|
IF NOT EXISTS |
Tidak | Jika dihilangkan dan stream dengan nama yang sama sudah ada, kesalahan akan dikembalikan. Jika ditentukan, pernyataan berhasil meskipun stream dengan nama yang sama sudah ada; metadata stream yang ada tidak berubah. |
stream_name |
Ya | Nama stream yang akan dibuat. |
ON TABLE <delta_table_name> |
Ya | Delta Table sumber yang akan dikaitkan dengan stream. Satu stream hanya mendukung satu tabel sumber, dan tabel sumber tidak dapat diubah setelah pembuatan. |
TIMESTAMP AS OF t |
Tidak | Menetapkan Offset Version awal ke timestamp t. Rentang kueri dimulai dari (t, timestamp data inkremental terbaru]. |
VERSION AS OF v |
Tidak | Menetapkan Offset Version awal ke versi data v. Rentang kueri dimulai dari (v, versi data inkremental terbaru]. |
strmproperties |
Ya | Properti stream dalam bentuk pasangan kunci-nilai string. Saat ini, hanya read_mode yang didukung. Nilai yang valid: append dan cdc. |
stream_comment |
Tidak | Komentar untuk stream. Maksimal 1024 byte; kesalahan akan dikembalikan jika melebihi batas tersebut. |
Kolom sistem CDC
Saat read_mode diatur ke cdc, tiga kolom sistem ditambahkan ke setiap baris output:
| Kolom | Tipe | Deskripsi |
|---|---|---|
__meta_timestamp |
timestamp | Waktu saat perubahan ditulis ke Delta Table. |
__meta_op_type |
tinyint | Jenis operasi: INSERT (1) atau DELETE (0). |
__meta_is_update |
tinyint | Apakah baris tersebut merupakan bagian dari UPDATE: TRUE (1) atau FALSE (0). |
Karena update direpresentasikan sebagai pasangan DELETE/INSERT, kombinasi __meta_op_type dan __meta_is_update mengidentifikasi jenis perubahan secara tepat:
| Operasi | __meta_op_type |
__meta_is_update |
|---|---|---|
| Insert baru | INSERT (1) | FALSE (0) |
| Nilai setelah update | INSERT (1) | TRUE (1) |
| Nilai sebelum update | DELETE (0) | TRUE (1) |
| Delete | DELETE (0) | FALSE (0) |
Contoh
Buat Delta Table, lalu buat stream dalam mode append mulai dari versi 1.
CREATE TABLE delta_table_src (
pk bigint NOT NULL PRIMARY KEY,
val bigint
) tblproperties ("transactional"="true");
CREATE STREAM delta_table_stream
ON TABLE delta_table_src VERSION AS OF 1
strmproperties('read_mode'='append')
comment 'Stream demo';
Lihat informasi stream
DESC STREAM <stream_name>;
Contoh
CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY KEY,
val BIGINT) TBLPROPERTIES ("transactional"="true");
CREATE STREAM delta_table_stream ON TABLE delta_table_src
VERSION AS OF 1 strmproperties('read_mode'='append')
comment 'Stream demo';
DESC STREAM delta_table_stream;
Output:
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-06 17:03:32
Last Modified Time 2024-09-06 17:03:32
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
| Field | Deskripsi |
|---|---|
Name |
Nama stream. |
Project |
Proyek tempat stream berada. |
Create Time |
Waktu saat stream dibuat. |
Last Modified Time |
Waktu saat stream terakhir dimodifikasi. |
Offset Version |
Versi data hingga perubahan yang telah dikonsumsi oleh stream ini. |
Reference Table Project |
Proyek tempat tabel sumber terkait berada. |
Reference Table Name |
Nama tabel sumber terkait. |
Reference Table Id |
ID unik dari tabel sumber terkait. |
Reference Table Version |
Versi data terbaru dari tabel sumber terkait. |
Parameters |
Properti stream, termasuk comment dan read_mode. |
Saat stream pertama kali dibuat terhadap tabel kosong,Offset VersiondanReference Table Versionbernilai sama. Seiring operasi DML dijalankan pada Delta Table,Reference Table Versionmaju. Stream mengembalikan semua perubahan dalam(Offset Version, Reference Table Version]. Setelah pembacaan berbasis DML mengonsumsi perubahan tersebut,Offset VersionmenyusulReference Table Version, dan stream mengembalikan hasil kosong hingga perubahan baru tiba.
Modifikasi stream
Modifikasi properti stream
ALTER STREAM <stream_name> SET strmproperties ("key"="value");
Saat ini, read_mode tidak dapat dimodifikasi setelah stream dibuat.
Modifikasi versi data awal
Gunakan perintah ini untuk mengatur ulang Offset Version—misalnya, untuk melewati rentang perubahan historis dan memajukan titik awal.
ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<TIMESTAMP AS OF t | VERSION AS OF v>;
| Parameter | Deskripsi |
|---|---|
stream_name |
Nama stream yang akan dimodifikasi. |
ON TABLE <delta_table_name> |
Harus merupakan tabel sumber yang sama seperti semula. Mengganti tabel sumber tidak didukung. |
TIMESTAMP AS OF t |
Mengatur ulang Offset Version ke timestamp t. Rentang kueri menjadi (t, versi data inkremental terbaru]. |
VERSION AS OF v |
Mengatur ulang Offset Version ke versi data v. Rentang kueri menjadi (v, versi data inkremental terbaru]. |
Contoh
Contoh ini menunjukkan siklus hidup lengkap: buat stream, sisipkan data untuk memajukan versi Delta Table, lalu atur ulang Offset Version stream.
-- 1. Buat Delta Table sumber.
CREATE TABLE delta_table_src (pk bigint NOT NULL PRIMARY KEY,
val bigint) tblproperties ("transactional"="true");
-- 2. Buat stream mulai dari versi 1.
CREATE STREAM delta_table_stream ON TABLE delta_table_src
VERSION AS OF 1 strmproperties('read_mode'='append')
comment 'Stream demo';
-- 3. Konfirmasi bahwa Offset Version dan Reference Table Version keduanya bernilai 1.
DESC STREAM delta_table_stream;
-- Output:
-- Offset Version 1
-- Reference Table Version 1
-- 4. Sisipkan catatan untuk memajukan Delta Table ke versi baru.
INSERT INTO delta_table_src VALUES ('1', '1');
-- 5. Lihat riwayat versi Delta Table.
SHOW HISTORY FOR TABLE delta_table_src;
-- ObjectType ObjectId ObjectName VERSION(LSN) Time Operation
-- TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000001 2024-09-07 10:25:59 CREATE
-- TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000002 2024-09-07 10:28:19 APPEND
-- 6. Atur ulang Offset Version stream ke versi 2,
-- melewati data yang disisipkan pada langkah 4.
ALTER STREAM delta_table_stream ON TABLE delta_table_src VERSION AS OF 2;
-- 7. Konfirmasi bahwa kedua versi kini bernilai 2.
DESC STREAM delta_table_stream;
-- Output:
-- Offset Version 2
-- Reference Table Version 2
Daftar semua stream dalam proyek
-- 1. Konfirmasi aliran ada.
SHOW STREAMS;
-- Keluaran:
-- delta_table_stream
-- 2. Hapus aliran.
DROP STREAM IF EXISTS delta_table_stream;
-- 3. Konfirmasi aliran telah dihapus.
SHOW STREAMS;
-- Keluaran: (kosong)
Contoh
-- Daftar semua stream dalam proyek saat ini.
SHOW STREAMS;
-- Output:
-- delta_table_stream
Hapus stream
DROP STREAM [IF EXISTS] <stream_name>;
Contoh
-- 1. Konfirmasi bahwa stream ada.
SHOW STREAMS;
-- Output:
-- delta_table_stream
-- 2. Hapus stream.
DROP STREAM IF EXISTS delta_table_stream;
-- 3. Konfirmasi bahwa stream telah dihapus.
SHOW STREAMS;
-- Output: (kosong)
Lakukan kueri terhadap stream
INSERT INTO <destination_table> SELECT * FROM <stream_name>;
Gunakan ini dalam pernyataan DML untuk mengonsumsi perubahan dan memajukan Offset Version:
INSERT INTO <destination_table> SELECT * FROM <stream_name>;
Contoh: Mode CDC
Contoh ini melacak insert dan update pada tabel sumber serta menyalin perubahan ke tabel tujuan menggunakan mode CDC.
Mode CDC pada Delta Table memerlukan pratinjau undangan. Untuk detail penyiapan, lihat CDC (pratinjau undangan).
-
Buat Delta Table sumber dengan CDC diaktifkan.
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ( "transactional"="true", 'acid.cdc.mode.enable'='true', 'cdc.insert.into.passthrough.enable'='true' ); -
Buat tabel tujuan.
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true"); -
Buat stream dalam mode CDC.
CREATE STREAM delta_table_stream ON TABLE delta_table_src VERSION AS OF 1 strmproperties('read_mode'='cdc') comment 'Stream cdc mode'; -
Sisipkan dua catatan ke tabel sumber.
INSERT INTO delta_table_src VALUES (1, 1), (2, 2); -
Lakukan kueri terhadap stream. Menjalankan
SELECTsaja tidak memajukan Offset Version—hasil yang sama dikembalikan pada setiap eksekusi berikutnya.SELECT * FROM delta_table_stream; -- Output +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 2 | 2 | 2024-09-07 11:03:53 | 1 | 0 | | 1 | 1 | 2024-09-07 11:03:53 | 1 | 0 | +------------+------------+------------------+----------------+------------------+Kedua baris menunjukkan
__meta_op_type=1(INSERT) dan__meta_is_update=0(FALSE), yang menandakan insert baru. -
Mengonsumsi perubahan dengan menyisipkannya ke tabel tujuan. Ini memajukan Offset Version.
INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream; -
Konfirmasi bahwa tabel tujuan menerima data.
SELECT * FROM delta_table_dest; -- Output +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+ -
Lakukan kueri terhadap stream lagi. Hasilnya kosong karena perubahan telah dikonsumsi pada langkah 6.
SELECT * FROM delta_table_stream; -- Output +------------+------------+ | pk | val | +------------+------------+ +------------+------------+ -
Update
pk=1di tabel sumber.UPDATE delta_table_src SET val = 10 WHERE pk = 1; -
Lakukan kueri terhadap stream lagi. UPDATE muncul sebagai dua baris: status sebelum update dan status setelah update.
SELECT * FROM delta_table_stream; -- Output +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 1 | 1 | 2024-09-07 11:10:21 | 0 | 1 | | 1 | 10 | 2024-09-07 11:10:21 | 1 | 1 | +------------+------------+------------------+----------------+------------------+Baris pertama (
__meta_op_type=0,__meta_is_update=1) adalah nilai sebelum update (DELETE + TRUE = UPDATE_BEFORE). Baris kedua (__meta_op_type=1,__meta_is_update=1) adalah nilai setelah update (INSERT + TRUE = UPDATE_AFTER).
Contoh: Mode append
Contoh ini menunjukkan perbedaan perilaku antara mode append dan mode CDC untuk operasi UPDATE dan DELETE.
-
Buat Delta Table sumber.
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true"); -
Buat tabel tujuan.
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true"); -
Buat stream dalam mode append.
CREATE STREAM delta_table_stream ON TABLE delta_table_src VERSION AS OF 1 strmproperties ('read_mode'='append') comment 'Stream append mode'; -
Sisipkan dua catatan ke tabel sumber.
INSERT INTO delta_table_src VALUES (1, 1), (2, 2); -
Lakukan kueri terhadap stream. Mode append tidak mengembalikan kolom sistem.
SELECT * FROM delta_table_stream; -- Keluaran +------------+------------+ | pk | val | +------------+------------+ +------------+------------+ -
Mengonsumsi perubahan.
INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream; -
Konfirmasi bahwa tabel tujuan menerima data.
SELECT * FROM delta_table_dest; -- Output +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+ -
Lakukan kueri terhadap stream. Hasilnya kosong—perubahan dari langkah 6 telah dikonsumsi.
SELECT * FROM delta_table_stream; -- Output +------------+------------+ | pk | val | +------------+------------+ +------------+------------+ -
Update
pk=1dan hapuspk=2di tabel sumber.UPDATE delta_table_src SET val = 10 WHERE pk = 1; DELETE FROM delta_table_src WHERE pk = 2; -
Lakukan kueri terhadap stream.
SELECT * FROM delta_table_stream; -- Output +------------+------------+ | pk | val | +------------+------------+ | 1 | 10 | +------------+------------+Hanya baris yang diperbarui
(1, 10)yang dikembalikan. Baris yang dihapus tidak disertakan. Mode append hanya mengembalikan status akhir dari baris yang dimodifikasi—tidak mengekspos pra-gambar atau penghapusan. Gunakan mode append untuk pipeline ETL yang memproses data yang terus-menerus disisipkan atau diperbarui; gunakan mode CDC ketika sistem downstream Anda membutuhkan riwayat perubahan lengkap, termasuk penghapusan dan nilai sebelum update.
Catatan penggunaan
-
Setiap stream melacak tepat satu Delta Table sumber. Mengganti tabel sumber setelah pembuatan tidak didukung.
-
read_modetidak dapat diubah setelah stream dibuat. -
Pernyataan
SELECTsaja tidak memajukan Offset Version; hanya operasi DML (sepertiINSERT INTO ... SELECT ... FROM <stream_name>) yang mengonsumsi perubahan dan memajukan penunjuk. -
Untuk pipeline multi-konsumen di mana sistem downstream berbeda membutuhkan data perubahan yang sama secara independen, buat stream terpisah untuk setiap konsumen. Stream tidak menyimpan data—hanya menyimpan penunjuk versi—sehingga beberapa stream pada Delta Table yang sama didukung.
-
Komentar stream tidak boleh melebihi 1024 byte.