全部產品
Search
文件中心

MaxCompute:流對象(Stream)

更新時間:Aug 29, 2025

Stream是MaxCompute自動管理Delta Table增量查詢資料版本的流對象,記錄對增量表所做的資料操作語言(DML)更改,包括插入、更新和刪除,以及有關每次更改的中繼資料,以便您可以使用更改的資料採取操作。本文為您詳細介紹Stream操作相關命令。

建立Stream

文法

CREATE STREAM [IF NOT EXISTS] <stream_name> 
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc") 
[comment <stream_comment>];

名稱

功能說明

IF NOT EXISTS

可選。如果不指定IF NOT EXISTS選項而存在同名Stream,會報錯。如果指定IF NOT EXISTS,只要存在同名Stream,即使原Stream結構與要建立的目標Stream結構不一致,均返回成功。已存在的同名Stream的中繼資料資訊不會被改動。

stream_name

必填。待建立的Stream名。

ON TABLE <delta_table_name>

代表Stream對象關聯的Delta Table源表,一個Stream對象只能關聯一張源表。

timestamp as of t

代表Stream對象建立時VersionOffset初始化資料時間戳記為t ,查詢範圍為(t, 最新增量資料時間戳記]

version as of v

代表Stream對象建立時VersionOffset初始化資料版本為v,查詢範圍為(v, 最新增量資料版本]

strmproperties

同表屬性一樣以key/value字元類型值對的形式出現,目前stream只支援了一個屬性read_mode,可選值有兩個:append模式用來消費Delta Table的表資料,CDC模式用來消費Delta Table的cdc資料。

stream_comment

可選。Stream注釋內容且為長度不超過1024位元組的有效字串,否則報錯。

系統列

對於"read_mode" = "cdc",會額外輸出三個系統列: __meta_timestamp代表資料寫入時間,__meta_op_type(包含INSERT | DELETE)和__meta_is_update(包含TRUE | FALSE),可組合成四種情況: INSERT + FALSE代表新記錄,INSERT + TRUE代表Update後的值,DELETE + TRUE代表Update前的值,DELETE+FALSE代表刪除。但當前輸出的系統列__meta_op_type__meta_is_update的值為tinyint類型,下面是對應的組合值和對應的含義:

+--------------------+--------------+-----------------+
|     表示的操作       |__meta_op_type|__meta_is_update |
+--------------------+--------------+-----------------+
|       INSERT       |    INSERT(1) |      FALSE(0)   |
|       DELETE       |    DELETE(0) |      FALSE(0)   |
|   UDPATE_BEFORE    |    DELETE(0) |      TRUE(1)    |
|   UPDATE_AFTER     |    INSERT(1) |      TRUE(1)    |
+--------------------+--------------+-----------------+

樣本說明

建立Delta Table源表,再建立一個Stream關聯Delta Table表。

CREATE TABLE delta_table_src (
  pk bigint NOT NULL PRIMARY KEY, 
  val bigint
) tblproperties ("transactional"="true");

CREATE STREAM delta_table_stream 
ON TABLE delta_table_src version as of 1 
strmproperties('read_mode'='append') 
comment 'stream demo';

查看Stream資訊

文法

DESC STREAM <stream_name>;

樣本

建立Delta Table源表,再建立一個Stream關聯Delta Table表,查詢Stream對象delta_table_stream

CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key, 
val BIGINT) TBLPROPERTIES ("transactional"="true");

CREATE STREAM delta_table_stream ON TABLE delta_table_src 
version AS OF 1 strmproperties('read_mode'='append') 
comment 'stream demo';

DESC STREAM delta_table_stream;

輸出結果

Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-06 17:03:32                     Last Modified Time                      2024-09-06 17:03:32                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      5e19a67eb97b4477b7fbce0c7bbcebca        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

名稱

說明

Name

當前Stream的名稱。

Project

當前Stream所在的專案名稱。

Create Time

當前Stream的建立時間。

Offset Version

當前Stream的初始化資料版本。

Reference Table Project

關聯的源表的專案名稱。

Reference Table Name

關聯的源表名稱。

Reference Table Id

關聯的源表的唯一標識ID。

Reference Table Version

關聯的源表的資料版本。

Parameters

當前Stream對象的屬性資訊。

說明

特別要注意Offset VersionReference Table Version的資訊,Offset Version的值為1表示當前Stream已經消費的關聯Delta Table資料的版本,Reference Table Version的值為1表示當前關聯的Delta Table的最新的資料版本。由於關聯的Delta Table是空表,所以兩者的值都是1。建立Stream對象後如果其關聯的Delta Table執行了DML操作,Reference Table Version的值會隨之更新改變。讀取Stream時會轉換成對關聯表的增量查詢,讀取的資料版本範圍區間為左開右閉區間,為(Offset Version, Reference Table Version],從而確保Offset Version和Reference Table Version之間的增量資料被讀到。如果(Offset Version, Reference Table Version]版本的增量資料被DML操作消費,消費後Offset Version會等於 Reference Table Version,即都為關聯Delta Table的最新資料版本,表示沒有新的增量資料。

修改Stream

修改Stream屬性

ALTER STREAM <stream_name> SET strmproperties ("key"="value");
說明
  • tream_name:必填。待描述的Stream名。

  • strmproperties:Stream的屬性,同表屬性一樣以key/value字元類型值對的形式出現,目前Stream只支援屬性read_mode,並且當前不支援修改。

修改Stream的初始化資料版本

ALTER STREAM <stream_name> ON TABLE <delta_table_name> 
<timestamp as of t  |  version as of v > ;
說明
  • stream_name:必填。待修改的Stream名。

  • ON TABLE <delta_table_name>: 代表Stream對象關聯的Delta Table源表,源表為修改前的源表,目前還不支援修改源表。

  • timestamp as of t: 代表修改Stream對象VersionOffset初始化資料版本為t,查詢範圍(t, 最新增量資料版本]

  • version as of v: 代表修改Stream對象VersionOffset初始化資料版本為v,查詢範圍(v, 最新增量資料版本]

樣本

-- 1. 建立Delta Table源表。
CREATE TABLE delta_table_src (pk bigint not null primary key, 
val bigint) tblproperties ("transactional"="true");

-- 2. 建立一個Stream關聯Delta Table表。
CREATE STREAM delta_table_stream on table delta_table_src 
version as of 1 strmproperties('read_mode'='append') 
comment 'stream demo';

-- 3. 查看建立的stream資訊,當前Offset Version和Reference Table Version都為1。
DESC STREAM delta_table_stream;
-- 輸出結果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:26:56                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

-- 4. stream關聯的Delet Table插入一條資料,目的是為了使得Delta Table的lsn遞增,
-- 之後我們將引用Delta Table的version修改為遞增後的version.
INSERT INTO delta_table_src VALUES ('1', '1');

-- 5. 查看當前Delta Table的資料版本資訊
SHOW history FOR TABLE delta_table_src;

ObjectType	ObjectId                        	ObjectName          	VERSION(LSN)    	Time               	Operation       
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000001	2024-09-07 10:25:59	CREATE          
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000002	2024-09-07 10:28:19	APPEND      

-- 6. 修改stream關聯的Delta Table的version為2
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;

-- 7. 查看修改後的stream資訊,stream以及關聯的Delta Table的version都變成了2。
DESC STREAM delta_table_stream;

-- 輸出結果。
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:29:12                     
Offset Version                          2                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 2                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

列出專案下的所有Stream

文法

SHOW STREAMS;

樣本

-- 列出當前專案下的所有stream對象
SHOW STREAMS;
-- 輸出結果。
delta_table_stream

刪除Stream

文法

DROP STREAM [IF EXISTS] <stream_name>;

樣本

-- 1. 查看當前專案中存在的所有stream對象
SHOW STREAMS;
-- 輸出顯示。
delta_table_stream

-- 2. 刪除delta_table_stream stream對象
DROP STREAM IF EXISTS delta_table_stream;

-- 3. 再次查看當前專案中存在的所有stream對象;結果為空白
SHOW STREAMS;

查詢Stream

文法

SELECT * FROM <stream_name>;
說明
  • 查詢Stream時,單純執行DQL,並不會改變Stream的狀態,即Stream的增量起始的Offset Version不會改變,但其關聯的Delta Table的Reference Table Version會隨著Delta Table狀態的改變而改變,保持為關聯Delta Table最新的Version資料版本。單純執行DQL,表示這份增量資料沒有被真正消費,只是進行了資料的探查。

  • 查詢Stream並且執行了DML操作,表示真正消費了Stream所表示的增量資料,會修改Stream的狀態,將關聯的資料版本遷移到這次DML操作查詢的最新增量資料版本,即Stream的Offset Version會等於關聯Delta Table的Reference Table Version,此時表示沒有新的增量資料,在目前狀態下如果Stream再被讀取,讀取的資料為空白。

CDC模式查詢輸出樣本

Delta Table CDC模式的使用,詳情請參見CDC(邀測)

  1. 建立Delta Table源表。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties (
      "transactional"="true", 
      'acid.cdc.mode.enable'='true', 
      'cdc.insert.into.passthrough.enable'='true'
    );
  2. 建立目標表。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. 建立CDC模式的Stream。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties('read_mode'='cdc') 
    comment 'stream cdc mode';
  4. 插入兩條資料至Delta Table源表。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. 查詢delta_table_stream輸出CDC格式資料。但執行單純的DQL,並不會改變delta_table_stream狀態,如下語句執行多次,返回結果一樣。

    SELECT * FROM  delta_table_stream;
    
    -- 輸出結果
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 2          | 2          | 2024-09-07 11:03:53 | 1             | 0                |
    | 1          | 1          | 2024-09-07 11:03:53 | 1              | 0                |
    +------------+------------+------------------+----------------+------------------+
  6. 讀取delta_table_stream表的增量資料,並插入到目標表delta_table_dest。同時將delta_table_stream的Offset Version修改為關聯表delta_table_src最新的資料版本。此操作真正消費了delta_table_stream表的增量資料。

    INSERT INTO  delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 查詢目標表。表中儲存的是步驟6操作消費的Stream表的增量資料。

    SELECT * FROM delta_table_dest; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. 重新查詢delta_table_stream表,輸出為空白。由於delta_table_stream表示的增量資料已經被消費,沒有新的增量資料。

    SELECT * FROM delta_table_stream;
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. 執行Update操作將源表pk為1記錄的val設為10。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. 由於源表由Update操作產生的新的增量資料,查詢delta_table_stream,可輸出Update操作的CDC資料。

    SELECT * FROM delta_table_stream;  
    
    --  輸出結果
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 1          | 1          | 2024-09-07 11:10:21 | 0              | 1                |
    | 1          | 10         | 2024-09-07 11:10:21 | 1              | 1                |
    +------------+------------+------------------+----------------+------------------+

由上述樣本可見,CDC輸出模式會跟蹤Delta Table源表的記錄變化,輸出所有變化狀態的記錄,可有效用於增量計算的邏輯。

Append模式查詢輸出樣本

  1. 建立Delta Table源表。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  2. 建立目標表。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. 建立Append模式的Stream,並關聯Delta Table。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties ('read_mode'='append') 
    comment 'stream append mode';
  4. 插入兩條資料到Delta Table源表。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. 查詢delta_table_stream

    SELECT * FROM delta_table_stream; 
    
    -- 輸出結果,不包含系統欄位。
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  6. 讀取delta_table_stream表的增量資料,並插入到目標表,同時將delta_table_stream的Offset Version修改為關聯表delta_table_src最新的資料版本,此操作真正消費delta_table_stream表的增量資料。

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 查詢目標表。表中儲存的是步驟6 操作消費的delta_table_stream表的增量資料。

    SELECT * FROM delta_table_dest; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. 重新查詢delta_table_stream表,輸出為空白。由於delta_table_stream表示的增量資料已經被消費,沒有新的增量資料。

    SELECT * FROM delta_table_stream; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. 執行Update操作將源表pk為1的記錄val設為10。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. 執行刪除操作,將源表中pk為2的記錄刪除。

    DELETE FROM delta_table_src WHERE pk = 2;
  11. 查詢delta_table_stream,只輸出了Update操作的結果記錄 (1, 10),Delete操作的記錄不可見,也不會輸出。

    SELECT * FROM delta_table_stream; 
    
    -- 輸出結果
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 10         |
    +------------+------------+

由上述樣本可見,Append輸出模式並不會顯示資料的操作狀態,只會輸出一條記錄的最終狀態,Delete記錄也不會輸出。因此使用的情境有限,通常可用於一些典型的ETL情境,不斷對增量插入的資料進行清洗。