All Products
Search
Document Center

MaxCompute:Stream objects

Last Updated:Mar 26, 2026

A stream is a MaxCompute object that automatically manages data versions for incremental queries on Delta Tables. It tracks data manipulation language (DML) changes — inserts, updates, and deletes — along with the metadata for each change, and makes them available for incremental consumption. Each stream maintains a version pointer so consumers always know which changes have been processed and which are new.

This topic covers the SQL commands for creating, inspecting, modifying, listing, deleting, and querying streams.

How it works

A stream is always associated with exactly one Delta Table. Internally, it maintains two version markers:

  • Offset Version: the data version up to which changes have been consumed. This advances only when you read the stream inside a DML operation.

  • Reference Table Version: the latest data version of the associated Delta Table. This updates automatically as the table changes.

Each time you query a stream, MaxCompute returns the incremental changes in the half-open interval (Offset Version, Reference Table Version].

Reading without consuming: Running a SELECT statement alone does not advance the Offset Version. The changes are visible but not marked as consumed — you can re-read them as many times as needed.

Consuming: When you use a stream inside a DML statement (for example, INSERT INTO ... SELECT ... FROM <stream_name>), the Offset Version advances to match the Reference Table Version. After consumption, the stream returns empty until new changes arrive.

Choose a read mode

Set read_mode when creating a stream to control what the stream returns.

Mode What it returns Best for
append Final state of each changed row; deleted rows are excluded Standard ETL pipelines that process only inserted or updated data
cdc All change states (before and after update, insert, delete), plus three system columns Downstream systems that need full change history, such as real-time sync or audit pipelines

Create a stream

CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <TIMESTAMP AS OF t | VERSION AS OF v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];
Parameter Required Description
IF NOT EXISTS No If omitted and a stream with the same name exists, an error is returned. If specified, the statement succeeds even if a same-name stream exists; the existing stream's metadata is unchanged.
stream_name Yes Name of the stream to create.
ON TABLE <delta_table_name> Yes Source Delta Table to associate with the stream. A stream supports only one source table, and the source table cannot be changed after creation.
TIMESTAMP AS OF t No Sets the initial Offset Version to timestamp t. The query range starts from (t, latest incremental data timestamp].
VERSION AS OF v No Sets the initial Offset Version to data version v. The query range starts from (v, latest incremental data version].
strmproperties Yes Stream properties as string key-value pairs. Currently, only read_mode is supported. Valid values: append and cdc.
stream_comment No A comment for the stream. Maximum 1024 bytes; an error is returned if exceeded.

CDC system columns

When read_mode is set to cdc, three system columns are appended to every output row:

Column Type Description
__meta_timestamp timestamp Time when the change was written to the Delta Table.
__meta_op_type tinyint Operation type: INSERT (1) or DELETE (0).
__meta_is_update tinyint Whether the row is part of an UPDATE: TRUE (1) or FALSE (0).

Because updates are represented as a DELETE/INSERT pair, combining __meta_op_type and __meta_is_update identifies the exact change type:

Operation __meta_op_type __meta_is_update
New insert INSERT (1) FALSE (0)
Value after an update INSERT (1) TRUE (1)
Value before an update DELETE (0) TRUE (1)
Delete DELETE (0) FALSE (0)

Example

Create a Delta Table, then create a stream in append mode starting from version 1.

CREATE TABLE delta_table_src (
  pk bigint NOT NULL PRIMARY KEY,
  val bigint
) tblproperties ("transactional"="true");

CREATE STREAM delta_table_stream
ON TABLE delta_table_src VERSION AS OF 1
strmproperties('read_mode'='append')
comment 'Stream demo';

View stream information

DESC STREAM <stream_name>;

Example

CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY KEY,
val BIGINT) TBLPROPERTIES ("transactional"="true");

CREATE STREAM delta_table_stream ON TABLE delta_table_src
VERSION AS OF 1 strmproperties('read_mode'='append')
comment 'Stream demo';

DESC STREAM delta_table_stream;

Output:

Name                                    delta_table_stream
Project                                 sql_optimizer
Create Time                             2024-09-06 17:03:32
Last Modified Time                      2024-09-06 17:03:32
Offset Version                          1
Reference Table Project                 sql_optimizer
Reference Table Name                    delta_table_src
Reference Table Id                      5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version                 1
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}
Field Description
Name Name of the stream.
Project Project where the stream resides.
Create Time Time when the stream was created.
Last Modified Time Time when the stream was last modified.
Offset Version Data version up to which changes have been consumed by this stream.
Reference Table Project Project where the associated source table resides.
Reference Table Name Name of the associated source table.
Reference Table Id Unique ID of the associated source table.
Reference Table Version Latest data version of the associated source table.
Parameters Stream properties, including comment and read_mode.
When the stream is first created against an empty table, Offset Version and Reference Table Version are equal. As DML operations run on the Delta Table, Reference Table Version advances. The stream returns all changes in (Offset Version, Reference Table Version]. After a DML-based read consumes those changes, Offset Version catches up to Reference Table Version, and the stream returns empty until new changes arrive.

Modify a stream

Modify stream properties

ALTER STREAM <stream_name> SET strmproperties ("key"="value");
Currently, read_mode cannot be modified after the stream is created.

Modify the initial data version

Use this command to reset the Offset Version — for example, to skip a range of historical changes and advance the starting point.

ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<TIMESTAMP AS OF t | VERSION AS OF v>;
Parameter Description
stream_name Name of the stream to modify.
ON TABLE <delta_table_name> Must be the same source table as the original. Changing the source table is not supported.
TIMESTAMP AS OF t Resets the Offset Version to timestamp t. The query range becomes (t, latest incremental data version].
VERSION AS OF v Resets the Offset Version to data version v. The query range becomes (v, latest incremental data version].

Example

This example shows the full lifecycle: create a stream, insert data to advance the Delta Table version, then reset the stream's Offset Version.

-- 1. Create a source Delta Table.
CREATE TABLE delta_table_src (pk bigint NOT NULL PRIMARY KEY,
val bigint) tblproperties ("transactional"="true");

-- 2. Create a stream starting from version 1.
CREATE STREAM delta_table_stream ON TABLE delta_table_src
VERSION AS OF 1 strmproperties('read_mode'='append')
comment 'Stream demo';

-- 3. Confirm that Offset Version and Reference Table Version are both 1.
DESC STREAM delta_table_stream;
-- Output:
-- Offset Version          1
-- Reference Table Version 1

-- 4. Insert a record to advance the Delta Table to a new version.
INSERT INTO delta_table_src VALUES ('1', '1');

-- 5. View Delta Table version history.
SHOW HISTORY FOR TABLE delta_table_src;
-- ObjectType  ObjectId                          ObjectName       VERSION(LSN)      Time                  Operation
-- TABLE       8605276ce0034b20af761bf4761ba62e  delta_table_src  0000000000000001  2024-09-07 10:25:59   CREATE
-- TABLE       8605276ce0034b20af761bf4761ba62e  delta_table_src  0000000000000002  2024-09-07 10:28:19   APPEND

-- 6. Reset the stream's Offset Version to version 2,
--    skipping the data inserted in step 4.
ALTER STREAM delta_table_stream ON TABLE delta_table_src VERSION AS OF 2;

-- 7. Confirm that both versions are now 2.
DESC STREAM delta_table_stream;
-- Output:
-- Offset Version          2
-- Reference Table Version 2

List all streams in a project

SHOW STREAMS;

Example

-- List all streams in the current project.
SHOW STREAMS;
-- Output:
-- delta_table_stream

Delete a stream

DROP STREAM [IF EXISTS] <stream_name>;

Example

-- 1. Confirm the stream exists.
SHOW STREAMS;
-- Output:
-- delta_table_stream

-- 2. Delete the stream.
DROP STREAM IF EXISTS delta_table_stream;

-- 3. Confirm the stream is gone.
SHOW STREAMS;
-- Output: (empty)

Query a stream

SELECT * FROM <stream_name>;

Use this inside a DML statement to consume changes and advance the Offset Version:

INSERT INTO <destination_table> SELECT * FROM <stream_name>;

Example: CDC mode

This example tracks inserts and updates on a source table and copies changes to a destination table using CDC mode.

CDC mode on Delta Tables requires an invitational preview. For setup details, see CDC (invitational preview).
  1. Create a source Delta Table with CDC enabled.

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties (
      "transactional"="true",
      'acid.cdc.mode.enable'='true',
      'cdc.insert.into.passthrough.enable'='true'
    );
  2. Create a destination table.

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties ("transactional"="true");
  3. Create a stream in CDC mode.

    CREATE STREAM delta_table_stream
    ON TABLE delta_table_src VERSION AS OF 1
    strmproperties('read_mode'='cdc')
    comment 'Stream cdc mode';
  4. Insert two records into the source table.

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. Query the stream. Running SELECT alone does not advance the Offset Version — the same result is returned on every subsequent run.

    SELECT * FROM delta_table_stream;
    
    -- Output
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 2          | 2          | 2024-09-07 11:03:53 | 1             | 0                |
    | 1          | 1          | 2024-09-07 11:03:53 | 1              | 0                |
    +------------+------------+------------------+----------------+------------------+

    Both rows show __meta_op_type=1 (INSERT) and __meta_is_update=0 (FALSE), indicating new inserts.

  6. Consume the changes by inserting them into the destination table. This advances the Offset Version.

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. Confirm the destination table received the data.

    SELECT * FROM delta_table_dest;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. Query the stream again. It returns empty because the changes were consumed in step 6.

    SELECT * FROM delta_table_stream;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. Update pk=1 in the source table.

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. Query the stream again. The UPDATE appears as two rows: the before-update state and the after-update state.

     SELECT * FROM delta_table_stream;
    
     -- Output
     +------------+------------+------------------+----------------+------------------+
     | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
     +------------+------------+------------------+----------------+------------------+
     | 1          | 1          | 2024-09-07 11:10:21 | 0              | 1                |
     | 1          | 10         | 2024-09-07 11:10:21 | 1              | 1                |
     +------------+------------+------------------+----------------+------------------+

    The first row (__meta_op_type=0, __meta_is_update=1) is the value before the update (DELETE + TRUE = UPDATE_BEFORE). The second row (__meta_op_type=1, __meta_is_update=1) is the value after the update (INSERT + TRUE = UPDATE_AFTER).

Example: Append mode

This example shows the behavior difference between append mode and CDC mode for UPDATE and DELETE operations.

  1. Create a source Delta Table.

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties ("transactional"="true");
  2. Create a destination table.

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY,
      val bigint
    ) tblproperties ("transactional"="true");
  3. Create a stream in append mode.

    CREATE STREAM delta_table_stream
    ON TABLE delta_table_src VERSION AS OF 1
    strmproperties ('read_mode'='append')
    comment 'Stream append mode';
  4. Insert two records into the source table.

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. Query the stream. Append mode returns no system columns.

    SELECT * FROM delta_table_stream;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  6. Consume the changes.

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. Confirm the destination table received the data.

    SELECT * FROM delta_table_dest;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. Query the stream. It returns empty — changes from step 6 were consumed.

    SELECT * FROM delta_table_stream;
    
    -- Output
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. Update pk=1 and delete pk=2 in the source table.

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
    DELETE FROM delta_table_src WHERE pk = 2;
  10. Query the stream.

     SELECT * FROM delta_table_stream;
    
     -- Output
     +------------+------------+
     | pk         | val        |
     +------------+------------+
     | 1          | 10         |
     +------------+------------+

    Only the updated row (1, 10) is returned. The deleted row is not included. Append mode returns only the final state of modified rows — it does not expose before-images or deletes. Use append mode for ETL pipelines that process continuously inserted or updated data; use CDC mode when your downstream system needs full change history, including deletes and before-update values.

Usage notes

  • Each stream tracks exactly one source Delta Table. Changing the source table after creation is not supported.

  • read_mode cannot be changed after the stream is created.

  • A SELECT statement alone does not advance the Offset Version; only DML operations (such as INSERT INTO ... SELECT ... FROM <stream_name>) consume changes and advance the pointer.

  • For multi-consumer pipelines where different downstream systems need the same change data independently, create a separate stream for each consumer. Streams do not store data — they store only a version pointer — so multiple streams on the same Delta Table are supported.

  • Stream comments must not exceed 1024 bytes.