A stream is a MaxCompute object that automatically manages data versions for incremental queries on Delta Tables. It records data manipulation language (DML) changes, such as inserts, updates, and deletes, along with the metadata for each change. This lets you use the changed data to perform operations. This topic describes the commands used for stream operations.
Create a stream
Syntax
CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];Name | Description |
IF NOT EXISTS | Optional. If you do not specify |
stream_name | Required. The name of the stream to create. |
ON TABLE <delta_table_name> | Specifies the source Delta Table to associate with the stream. A stream can be associated with only one source table. |
timestamp as of t | Initializes the VersionOffset of the stream to the data timestamp t. The query range is |
version as of v | Initializes the VersionOffset of the stream to the data version v. The query range is |
strmproperties | Specifies the stream properties as key-value pairs of the string type, similar to table properties. Currently, only the read_mode property is supported. Valid values are |
stream_comment | Optional. The comment for the stream. The comment must be a valid string of no more than 1024 bytes. Otherwise, an error is returned. |
System columns | For |
Example
Create a source Delta Table, and then create a stream associated with the Delta Table.
CREATE TABLE delta_table_src (
pk bigint NOT NULL PRIMARY KEY,
val bigint
) tblproperties ("transactional"="true");
CREATE STREAM delta_table_stream
ON TABLE delta_table_src version as of 1
strmproperties('read_mode'='append')
comment 'Stream demo';View stream information
Syntax
DESC STREAM <stream_name>;Example
Create a source Delta Table and a stream associated with it. Then, view the information about the stream object delta_table_stream.
CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key,
val BIGINT) TBLPROPERTIES ("transactional"="true");
CREATE STREAM delta_table_stream ON TABLE delta_table_src
version AS OF 1 strmproperties('read_mode'='append')
comment 'Stream demo';
DESC STREAM delta_table_stream;Output
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-06 17:03:32 Last Modified Time 2024-09-06 17:03:32
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}Name | Description |
Name | The name of the stream. |
Project | The name of the project where the stream resides. |
Create Time | The time when the stream was created. |
Offset Version | The initial data version of the stream. |
Reference Table Project | The name of the project where the associated source table resides. |
Reference Table Name | The name of the associated source table. |
Reference Table Id | The unique ID of the associated source table. |
Reference Table Version | The data version of the associated source table. |
Parameters | The properties of the stream object. |
Pay special attention to the information about Offset Version and Reference Table Version. The Offset Version indicates the version of data from the associated Delta Table that has been consumed by the current stream. The Reference Table Version indicates the latest data version of the associated Delta Table. Because the associated Delta Table is empty, the value for both is 1. After a Stream object is created, if DML operations are performed on its associated Delta Table, the value of Reference Table Version is updated accordingly. Reading a stream is converted into an incremental query on the associated table. The data version range for the query is the left-open, right-closed interval (Offset Version, Reference Table Version]. This ensures that the incremental data between Offset Version and Reference Table Version is read. If the incremental data in the (Offset Version, Reference Table Version] version range is consumed by DML operations, Offset Version becomes equal to Reference Table Version. At this point, both versions reflect the latest data version of the associated Delta Table, which indicates that no new incremental data is available.
Modify a stream
Modify stream properties
ALTER STREAM <stream_name> SET strmproperties ("key"="value");stream_name: Required. The name of the stream to modify.
strmproperties: The properties of the stream are specified as key-value pairs of the string type, similar to table properties. Currently, only the
read_modeproperty is supported and cannot be modified.
Modify the initial data version of a stream
ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<timestamp as of t | version as of v > ;stream_name: Required. The name of the stream to modify.
ON TABLE <delta_table_name>: Specifies the source Delta Table associated with the stream. This must be the same as the original source table. Modifying the source table is not currently supported.
timestamp as of t: Modifies the initial data version (VersionOffset) of the stream to timestamp t. The query range is
(t, latest incremental data version].version as of v: Modifies the initial data version (VersionOffset) of the stream to version v. The query range is
(v, latest incremental data version].
Example
-- 1. Create a source Delta Table.
CREATE TABLE delta_table_src (pk bigint not null primary key,
val bigint) tblproperties ("transactional"="true");
-- 2. Create a stream associated with the Delta Table.
CREATE STREAM delta_table_stream on table delta_table_src
version as of 1 strmproperties('read_mode'='append')
comment 'Stream demo';
-- 3. View the information of the new stream. The current Offset Version and Reference Table Version are both 1.
DESC STREAM delta_table_stream;
-- Output:
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:26:56
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
-- 4. Insert a record into the associated Delta Table to increment its LSN.
-- Then, change the referenced version of the Delta Table to the new, incremented version.
INSERT INTO delta_table_src VALUES ('1', '1');
-- 5. View the current data version information of the Delta Table.
SHOW history FOR TABLE delta_table_src;
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000001 2024-09-07 10:25:59 CREATE
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000002 2024-09-07 10:28:19 APPEND
-- 6. Change the version of the associated Delta Table for the stream to 2.
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;
-- 7. View the information of the modified stream. The versions of both the stream and the associated Delta Table are now 2.
DESC STREAM delta_table_stream;
-- Output:
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:29:12
Offset Version 2
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 2
Parameters {
"comment": "stream demo",
"read_mode": "append"}List all streams in a project
Syntax
SHOW STREAMS;Example
-- List all stream objects in the current project.
SHOW STREAMS;
-- Output:
delta_table_streamDelete a stream
Syntax
DROP STREAM [IF EXISTS] <stream_name>;Example
-- 1. View all existing stream objects in the current project.
SHOW STREAMS;
-- Output:
delta_table_stream
-- 2. Delete the delta_table_stream object.
DROP STREAM IF EXISTS delta_table_stream;
-- 3. View the stream objects in the project again. The result is empty.
SHOW STREAMS;Query a stream
Syntax
SELECT * FROM <stream_name>;When you query a stream, running only a Data Query Language (DQL) statement does not change the stream's status. The stream's starting Offset Version remains unchanged. However, the Reference Table Version of its associated Delta Table changes with the Delta Table's status and always reflects the latest data version. Running only a DQL statement means the incremental data is inspected but not consumed.
When you query a stream and perform a DML operation, the incremental data represented by the stream is consumed. This modifies the stream's status. The associated data version is advanced to the latest incremental data version that was queried in the DML operation. This means the stream's Offset Version becomes equal to the associated Delta Table's Reference Table Version. At this point, there is no new incremental data. If the stream is read again in its current state, the result is empty.
Example output for a query in CDC mode
For more information about how to use the Change Data Capture (CDC) mode for Delta Tables, see CDC (invitational preview).
Create a source Delta Table.
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ( "transactional"="true", 'acid.cdc.mode.enable'='true', 'cdc.insert.into.passthrough.enable'='true' );Create a destination table.
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");Create a stream in CDC mode.
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties('read_mode'='cdc') comment 'Stream cdc mode';Insert two records into the source Delta Table.
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);Query
delta_table_streamto output data in CDC format. Running only a DQL statement does not change the status ofdelta_table_stream. If you run the following statement multiple times, you will receive the same result.SELECT * FROM delta_table_stream; -- Output +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 2 | 2 | 2024-09-07 11:03:53 | 1 | 0 | | 1 | 1 | 2024-09-07 11:03:53 | 1 | 0 | +------------+------------+------------------+----------------+------------------+Read the incremental data from the
delta_table_streamtable and insert it into the destination tabledelta_table_dest. This operation also updates the Offset Version ofdelta_table_streamto the latest data version of the associateddelta_table_srctable and consumes the incremental data from thedelta_table_streamtable.INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;Query the destination table. The table contains the incremental data from the stream that was consumed in Step 6.
SELECT * FROM delta_table_dest; -- Output +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+Query the
delta_table_streamtable again. The query returns an empty result. This is because the incremental data represented bydelta_table_streamhas been consumed, and no new incremental data exists.SELECT * FROM delta_table_stream; -- Output +------------+------------+ | pk | val | +------------+------------+ +------------+------------+Run an UPDATE operation to set the value of val to 10 for the record where pk is 1 in the source table.
UPDATE delta_table_src SET val = 10 WHERE pk = 1;The UPDATE operation generated new incremental data in the source table. You can now query
delta_table_streamto output the CDC data for the update.SELECT * FROM delta_table_stream; -- Output +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 1 | 1 | 2024-09-07 11:10:21 | 0 | 1 | | 1 | 10 | 2024-09-07 11:10:21 | 1 | 1 | +------------+------------+------------------+----------------+------------------+
This example shows that the CDC output mode tracks record changes in the source Delta Table and outputs records for all change states, which is effective for incremental computation logic.
Example output for a query in append mode
Create a source Delta Table.
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");Create a destination table.
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");Create a stream in append mode and associate it with the Delta Table.
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties ('read_mode'='append') comment 'Stream append mode';Insert two records into the source Delta Table.
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);Query
delta_table_stream.SELECT * FROM delta_table_stream; -- The output does not include system columns. +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+The incremental data is read from the
delta_table_streamtable and inserted into the destination table. This also updates the Offset Version ofdelta_table_streamto the latest data version of the associateddelta_table_srctable. This operation consumes the incremental data from thedelta_table_streamtable.INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;Query the destination table. The table contains the incremental data from the
delta_table_streamtable that was consumed in Step 6.SELECT * FROM delta_table_dest; -- Output +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+Query the
delta_table_streamtable again. The query returns an empty result. This is because the incremental data represented bydelta_table_streamhas been consumed, and no new incremental data exists.SELECT * FROM delta_table_stream; -- Output +------------+------------+ | pk | val | +------------+------------+ +------------+------------+Run an UPDATE operation to set the value of val to 10 for the record where pk is 1 in the source table.
UPDATE delta_table_src SET val = 10 WHERE pk = 1;Run a DELETE operation to delete the record where pk is 2 from the source table.
DELETE FROM delta_table_src WHERE pk = 2;Query
delta_table_stream. Only the result of the UPDATE operation, record (1, 10), is returned. The record affected by the DELETE operation is not returned.SELECT * FROM delta_table_stream; -- Output +------------+------------+ | pk | val | +------------+------------+ | 1 | 10 | +------------+------------+
This example shows that the append output mode does not show the operation status of the data. It only outputs the final state of a record, and deleted records are not returned. Therefore, its use cases are limited. It is typically used for common extract, transform, and load (ETL) scenarios to continuously process incrementally inserted data.