All Products
Search
Document Center

MaxCompute:Objek stream

Last Updated:Mar 27, 2026

Stream adalah objek MaxCompute yang secara otomatis mengelola versi data untuk kueri inkremental pada Delta Table. Stream ini melacak perubahan bahasa manipulasi data (DML)—berupa INSERT, UPDATE, dan DELETE—beserta metadata untuk setiap perubahan tersebut, serta menyediakannya untuk konsumsi inkremental. Setiap stream mempertahankan penunjuk versi sehingga konsumen selalu mengetahui perubahan mana yang telah diproses dan mana yang baru.

Topik ini mencakup perintah SQL untuk membuat, memeriksa, memodifikasi, mendaftar, menghapus, dan melakukan kueri terhadap stream.

Cara kerja

Sebuah stream selalu dikaitkan dengan tepat satu Delta Table. Secara internal, stream tersebut mempertahankan dua penanda versi:

  • Offset Version: versi data hingga perubahan yang telah dikonsumsi. Penanda ini hanya maju ketika Anda membaca stream dalam operasi DML.

  • Reference Table Version: versi data terbaru dari Delta Table terkait. Versi ini diperbarui secara otomatis seiring perubahan pada tabel.

Setiap kali Anda melakukan kueri terhadap stream, MaxCompute mengembalikan perubahan inkremental dalam interval setengah terbuka (Offset Version, Reference Table Version].

Membaca tanpa mengonsumsi: Menjalankan pernyataan SELECT saja tidak akan memajukan Offset Version. Perubahan tersebut terlihat tetapi tidak ditandai sebagai telah dikonsumsi—Anda dapat membacanya kembali sebanyak yang diperlukan.

Mengonsumsi: Ketika Anda menggunakan stream dalam pernyataan DML (misalnya, INSERT INTO ... SELECT ... FROM <stream_name>), Offset Version dimajukan agar sesuai dengan Reference Table Version. Setelah dikonsumsi, stream akan mengembalikan hasil kosong hingga perubahan baru tiba.

Pilih mode baca

Tetapkan read_mode saat membuat stream untuk mengontrol data apa yang dikembalikan oleh stream tersebut.

Mode Data yang dikembalikan Paling cocok untuk
append Status akhir setiap baris yang berubah; baris yang dihapus tidak disertakan Pipeline ETL standar yang hanya memproses data yang disisipkan atau diperbarui
cdc Semua status perubahan (sebelum dan sesudah update, insert, delete), ditambah tiga kolom sistem Sistem downstream yang membutuhkan riwayat perubahan lengkap, seperti sinkronisasi real-time atau pipeline audit

Buat stream

CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <TIMESTAMP AS OF t | VERSION AS OF v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];
Parameter Wajib Deskripsi
IF NOT EXISTS Tidak Jika dihilangkan dan stream dengan nama yang sama sudah ada, kesalahan akan dikembalikan. Jika ditentukan, pernyataan berhasil meskipun stream dengan nama yang sama sudah ada; metadata stream yang ada tidak berubah.
stream_name Ya Nama stream yang akan dibuat.
ON TABLE <delta_table_name> Ya Delta Table sumber yang akan dikaitkan dengan stream. Satu stream hanya mendukung satu tabel sumber, dan tabel sumber tidak dapat diubah setelah pembuatan.
TIMESTAMP AS OF t Tidak Menetapkan Offset Version awal ke timestamp t. Rentang kueri dimulai dari (t, timestamp data inkremental terbaru].
VERSION AS OF v Tidak Menetapkan Offset Version awal ke versi data v. Rentang kueri dimulai dari (v, versi data inkremental terbaru].
strmproperties Ya Properti stream dalam bentuk pasangan kunci-nilai string. Saat ini, hanya read_mode yang didukung. Nilai yang valid: append dan cdc.
stream_comment Tidak Komentar untuk stream. Maksimal 1024 byte; kesalahan akan dikembalikan jika melebihi batas tersebut.

Kolom sistem CDC

Saat read_mode diatur ke cdc, tiga kolom sistem ditambahkan ke setiap baris output:

Kolom Tipe Deskripsi
__meta_timestamp timestamp Waktu saat perubahan ditulis ke Delta Table.
__meta_op_type tinyint Jenis operasi: INSERT (1) atau DELETE (0).
__meta_is_update tinyint Apakah baris tersebut merupakan bagian dari UPDATE: TRUE (1) atau FALSE (0).

Karena update direpresentasikan sebagai pasangan DELETE/INSERT, kombinasi __meta_op_type dan __meta_is_update mengidentifikasi jenis perubahan secara tepat:

Operasi __meta_op_type __meta_is_update
Insert baru INSERT (1) FALSE (0)
Nilai setelah update INSERT (1) TRUE (1)
Nilai sebelum update DELETE (0) TRUE (1)
Delete DELETE (0) FALSE (0)

Contoh

Buat Delta Table, lalu buat stream dalam mode append mulai dari versi 1.

CREATE TABLE delta_table_src (
  pk bigint NOT NULL PRIMARY KEY,
  val bigint
) tblproperties ("transactional"="true");

CREATE STREAM delta_table_stream
ON TABLE delta_table_src VERSION AS OF 1
strmproperties('read_mode'='append')
comment 'Stream demo';

Lihat informasi stream

DESC STREAM <stream_name>;

Contoh

CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY KEY,
val BIGINT) TBLPROPERTIES ("transactional"="true");

CREATE STREAM delta_table_stream ON TABLE delta_table_src
VERSION AS OF 1 strmproperties('read_mode'='append')
comment 'Stream demo';

DESC STREAM delta_table_stream;

Output:

Name                                    delta_table_stream
Project                                 sql_optimizer
Create Time                             2024-09-06 17:03:32
Last Modified Time                      2024-09-06 17:03:32
Offset Version                          1
Reference Table Project                 sql_optimizer
Reference Table Name                    delta_table_src
Reference Table Id                      5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version                 1
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}
Field Deskripsi
Name Nama stream.
Project Proyek tempat stream berada.
Create Time Waktu saat stream dibuat.
Last Modified Time Waktu saat stream terakhir dimodifikasi.
Offset Version Versi data hingga perubahan yang telah dikonsumsi oleh stream ini.
Reference Table Project Proyek tempat tabel sumber terkait berada.
Reference Table Name Nama tabel sumber terkait.
Reference Table Id ID unik dari tabel sumber terkait.
Reference Table Version Versi data terbaru dari tabel sumber terkait.
Parameters Properti stream, termasuk comment dan read_mode.
Saat stream pertama kali dibuat terhadap tabel kosong, Offset Version dan Reference Table Version bernilai sama. Seiring operasi DML dijalankan pada Delta Table, Reference Table Version maju. Stream mengembalikan semua perubahan dalam (Offset Version, Reference Table Version]. Setelah pembacaan berbasis DML mengonsumsi perubahan tersebut, Offset Version menyusul Reference Table Version, dan stream mengembalikan hasil kosong hingga perubahan baru tiba.

Modifikasi stream

Modifikasi properti stream

ALTER STREAM <stream_name> SET strmproperties ("key"="value");
Saat ini, read_mode tidak dapat dimodifikasi setelah stream dibuat.

Modifikasi versi data awal

Gunakan perintah ini untuk mengatur ulang Offset Version—misalnya, untuk melewati rentang perubahan historis dan memajukan titik awal.

ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<TIMESTAMP AS OF t | VERSION AS OF v>;
Parameter Deskripsi
stream_name Nama stream yang akan dimodifikasi.
ON TABLE <delta_table_name> Harus merupakan tabel sumber yang sama seperti semula. Mengganti tabel sumber tidak didukung.
TIMESTAMP AS OF t Mengatur ulang Offset Version ke timestamp t. Rentang kueri menjadi (t, versi data inkremental terbaru].
VERSION AS OF v Mengatur ulang Offset Version ke versi data v. Rentang kueri menjadi (v, versi data inkremental terbaru].

Contoh

Contoh ini menunjukkan siklus hidup lengkap: buat stream, sisipkan data untuk memajukan versi Delta Table, lalu atur ulang Offset Version stream.

-- 1. Buat Delta Table sumber.
CREATE TABLE delta_table_src (pk bigint NOT NULL PRIMARY KEY,
val bigint) tblproperties ("transactional"="true");

-- 2. Buat stream mulai dari versi 1.
CREATE STREAM delta_table_stream ON TABLE delta_table_src
VERSION AS OF 1 strmproperties('read_mode'='append')
comment 'Stream demo';

-- 3. Konfirmasi bahwa Offset Version dan Reference Table Version keduanya bernilai 1.
DESC STREAM delta_table_stream;
-- Output:
-- Offset Version          1
-- Reference Table Version 1

-- 4. Sisipkan catatan untuk memajukan Delta Table ke versi baru.
INSERT INTO delta_table_src VALUES ('1', '1');

-- 5. Lihat riwayat versi Delta Table.
SHOW HISTORY FOR TABLE delta_table_src;
-- ObjectType  ObjectId                          ObjectName       VERSION(LSN)      Time                  Operation
-- TABLE       8605276ce0034b20af761bf4761ba62e  delta_table_src  0000000000000001  2024-09-07 10:25:59   CREATE
-- TABLE       8605276ce0034b20af761bf4761ba62e  delta_table_src  0000000000000002  2024-09-07 10:28:19   APPEND

-- 6. Atur ulang Offset Version stream ke versi 2,
--    melewati data yang disisipkan pada langkah 4.
ALTER STREAM delta_table_stream ON TABLE delta_table_src VERSION AS OF 2;

-- 7. Konfirmasi bahwa kedua versi kini bernilai 2.
DESC STREAM delta_table_stream;
-- Output:
-- Offset Version          2
-- Reference Table Version 2

Daftar semua stream dalam proyek

-- 1. Konfirmasi aliran ada.
SHOW STREAMS;
-- Keluaran:
-- delta_table_stream

-- 2. Hapus aliran.
DROP STREAM IF EXISTS delta_table_stream;

-- 3. Konfirmasi aliran telah dihapus.
SHOW STREAMS;
-- Keluaran: (kosong)

Contoh

-- Daftar semua stream dalam proyek saat ini.
SHOW STREAMS;
-- Output:
-- delta_table_stream

Hapus stream

DROP STREAM [IF EXISTS] <stream_name>;

Contoh

-- 1. Konfirmasi bahwa stream ada.
SHOW STREAMS;
-- Output:
-- delta_table_stream

-- 2. Hapus stream.
DROP STREAM IF EXISTS delta_table_stream;

-- 3. Konfirmasi bahwa stream telah dihapus.
SHOW STREAMS;
-- Output: (kosong)

Lakukan kueri terhadap stream

INSERT INTO <destination_table> SELECT * FROM <stream_name>;

Gunakan ini dalam pernyataan DML untuk mengonsumsi perubahan dan memajukan Offset Version:

INSERT INTO <destination_table> SELECT * FROM <stream_name>;

Contoh: Mode CDC

Contoh ini melacak insert dan update pada tabel sumber serta menyalin perubahan ke tabel tujuan menggunakan mode CDC.

Mode CDC pada Delta Table memerlukan pratinjau undangan. Untuk detail penyiapan, lihat CDC (pratinjau undangan).
  1. Buat Delta Table sumber dengan CDC diaktifkan.

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties (
      "transactional"="true",
      'acid.cdc.mode.enable'='true',
      'cdc.insert.into.passthrough.enable'='true'
    );
  2. Buat tabel tujuan.

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties ("transactional"="true");
  3. Buat stream dalam mode CDC.

    CREATE STREAM delta_table_stream
    ON TABLE delta_table_src VERSION AS OF 1
    strmproperties('read_mode'='cdc')
    comment 'Stream cdc mode';
  4. Sisipkan dua catatan ke tabel sumber.

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. Lakukan kueri terhadap stream. Menjalankan SELECT saja tidak memajukan Offset Version—hasil yang sama dikembalikan pada setiap eksekusi berikutnya.

    SELECT * FROM delta_table_stream;
    
    -- Output
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 2          | 2          | 2024-09-07 11:03:53 | 1             | 0                |
    | 1          | 1          | 2024-09-07 11:03:53 | 1              | 0                |
    +------------+------------+------------------+----------------+------------------+

    Kedua baris menunjukkan __meta_op_type=1 (INSERT) dan __meta_is_update=0 (FALSE), yang menandakan insert baru.

  6. Mengonsumsi perubahan dengan menyisipkannya ke tabel tujuan. Ini memajukan Offset Version.

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. Konfirmasi bahwa tabel tujuan menerima data.

    SELECT * FROM delta_table_dest;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. Lakukan kueri terhadap stream lagi. Hasilnya kosong karena perubahan telah dikonsumsi pada langkah 6.

    SELECT * FROM delta_table_stream;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. Update pk=1 di tabel sumber.

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. Lakukan kueri terhadap stream lagi. UPDATE muncul sebagai dua baris: status sebelum update dan status setelah update.

     SELECT * FROM delta_table_stream;
    
     -- Output
     +------------+------------+------------------+----------------+------------------+
     | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
     +------------+------------+------------------+----------------+------------------+
     | 1          | 1          | 2024-09-07 11:10:21 | 0              | 1                |
     | 1          | 10         | 2024-09-07 11:10:21 | 1              | 1                |
     +------------+------------+------------------+----------------+------------------+

    Baris pertama (__meta_op_type=0, __meta_is_update=1) adalah nilai sebelum update (DELETE + TRUE = UPDATE_BEFORE). Baris kedua (__meta_op_type=1, __meta_is_update=1) adalah nilai setelah update (INSERT + TRUE = UPDATE_AFTER).

Contoh: Mode append

Contoh ini menunjukkan perbedaan perilaku antara mode append dan mode CDC untuk operasi UPDATE dan DELETE.

  1. Buat Delta Table sumber.

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties ("transactional"="true");
  2. Buat tabel tujuan.

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties ("transactional"="true");
  3. Buat stream dalam mode append.

    CREATE STREAM delta_table_stream
    ON TABLE delta_table_src VERSION AS OF 1
    strmproperties ('read_mode'='append')
    comment 'Stream append mode';
  4. Sisipkan dua catatan ke tabel sumber.

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. Lakukan kueri terhadap stream. Mode append tidak mengembalikan kolom sistem.

    SELECT * FROM delta_table_stream;
    
    -- Keluaran
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  6. Mengonsumsi perubahan.

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. Konfirmasi bahwa tabel tujuan menerima data.

    SELECT * FROM delta_table_dest;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. Lakukan kueri terhadap stream. Hasilnya kosong—perubahan dari langkah 6 telah dikonsumsi.

    SELECT * FROM delta_table_stream;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. Update pk=1 dan hapus pk=2 di tabel sumber.

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
    DELETE FROM delta_table_src WHERE pk = 2;
  10. Lakukan kueri terhadap stream.

     SELECT * FROM delta_table_stream;
    
     -- Output
     +------------+------------+
     | pk         | val        |
     +------------+------------+
     | 1          | 10         |
     +------------+------------+

    Hanya baris yang diperbarui (1, 10) yang dikembalikan. Baris yang dihapus tidak disertakan. Mode append hanya mengembalikan status akhir dari baris yang dimodifikasi—tidak mengekspos pra-gambar atau penghapusan. Gunakan mode append untuk pipeline ETL yang memproses data yang terus-menerus disisipkan atau diperbarui; gunakan mode CDC ketika sistem downstream Anda membutuhkan riwayat perubahan lengkap, termasuk penghapusan dan nilai sebelum update.

Catatan penggunaan

  • Setiap stream melacak tepat satu Delta Table sumber. Mengganti tabel sumber setelah pembuatan tidak didukung.

  • read_mode tidak dapat diubah setelah stream dibuat.

  • Pernyataan SELECT saja tidak memajukan Offset Version; hanya operasi DML (seperti INSERT INTO ... SELECT ... FROM <stream_name>) yang mengonsumsi perubahan dan memajukan penunjuk.

  • Untuk pipeline multi-konsumen di mana sistem downstream berbeda membutuhkan data perubahan yang sama secara independen, buat stream terpisah untuk setiap konsumen. Stream tidak menyimpan data—hanya menyimpan penunjuk versi—sehingga beberapa stream pada Delta Table yang sama didukung.

  • Komentar stream tidak boleh melebihi 1024 byte.