Stream是MaxCompute自動管理Delta Table增量查詢資料版本的流對象,記錄對增量表所做的資料操作語言(DML)更改,包括插入、更新和刪除,以及有關每次更改的中繼資料,以便您可以使用更改的資料採取操作。本文為您詳細介紹Stream操作相關命令。
建立Stream
文法
CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];名稱 | 功能說明 |
IF NOT EXISTS | 可選。如果不指定 |
stream_name | 必填。待建立的Stream名。 |
ON TABLE <delta_table_name> | 代表Stream對象關聯的Delta Table源表,一個Stream對象只能關聯一張源表。 |
timestamp as of t | 代表Stream對象建立時VersionOffset初始化資料時間戳記為t ,查詢範圍為 |
version as of v | 代表Stream對象建立時VersionOffset初始化資料版本為v,查詢範圍為 |
strmproperties | 同表屬性一樣以key/value字元類型值對的形式出現,目前stream只支援了一個屬性read_mode,可選值有兩個: |
stream_comment | 可選。Stream注釋內容且為長度不超過1024位元組的有效字串,否則報錯。 |
系統列 | 對於 |
樣本說明
建立Delta Table源表,再建立一個Stream關聯Delta Table表。
CREATE TABLE delta_table_src (
pk bigint NOT NULL PRIMARY KEY,
val bigint
) tblproperties ("transactional"="true");
CREATE STREAM delta_table_stream
ON TABLE delta_table_src version as of 1
strmproperties('read_mode'='append')
comment 'stream demo';查看Stream資訊
文法
DESC STREAM <stream_name>;樣本
建立Delta Table源表,再建立一個Stream關聯Delta Table表,查詢Stream對象delta_table_stream。
CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key,
val BIGINT) TBLPROPERTIES ("transactional"="true");
CREATE STREAM delta_table_stream ON TABLE delta_table_src
version AS OF 1 strmproperties('read_mode'='append')
comment 'stream demo';
DESC STREAM delta_table_stream;輸出結果
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-06 17:03:32 Last Modified Time 2024-09-06 17:03:32
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}名稱 | 說明 |
Name | 當前Stream的名稱。 |
Project | 當前Stream所在的專案名稱。 |
Create Time | 當前Stream的建立時間。 |
Offset Version | 當前Stream的初始化資料版本。 |
Reference Table Project | 關聯的源表的專案名稱。 |
Reference Table Name | 關聯的源表名稱。 |
Reference Table Id | 關聯的源表的唯一標識ID。 |
Reference Table Version | 關聯的源表的資料版本。 |
Parameters | 當前Stream對象的屬性資訊。 |
特別要注意Offset Version和Reference Table Version的資訊,Offset Version的值為1表示當前Stream已經消費的關聯Delta Table資料的版本,Reference Table Version的值為1表示當前關聯的Delta Table的最新的資料版本。由於關聯的Delta Table是空表,所以兩者的值都是1。建立Stream對象後如果其關聯的Delta Table執行了DML操作,Reference Table Version的值會隨之更新改變。讀取Stream時會轉換成對關聯表的增量查詢,讀取的資料版本範圍區間為左開右閉區間,為(Offset Version, Reference Table Version],從而確保Offset Version和Reference Table Version之間的增量資料被讀到。如果(Offset Version, Reference Table Version]版本的增量資料被DML操作消費,消費後Offset Version會等於 Reference Table Version,即都為關聯Delta Table的最新資料版本,表示沒有新的增量資料。
修改Stream
修改Stream屬性
ALTER STREAM <stream_name> SET strmproperties ("key"="value");tream_name:必填。待描述的Stream名。
strmproperties:Stream的屬性,同表屬性一樣以key/value字元類型值對的形式出現,目前Stream只支援屬性
read_mode,並且當前不支援修改。
修改Stream的初始化資料版本
ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<timestamp as of t | version as of v > ;stream_name:必填。待修改的Stream名。
ON TABLE <delta_table_name>: 代表Stream對象關聯的Delta Table源表,源表為修改前的源表,目前還不支援修改源表。
timestamp as of t: 代表修改Stream對象VersionOffset初始化資料版本為t,查詢範圍
(t, 最新增量資料版本]。version as of v: 代表修改Stream對象VersionOffset初始化資料版本為v,查詢範圍
(v, 最新增量資料版本]。
樣本
-- 1. 建立Delta Table源表。
CREATE TABLE delta_table_src (pk bigint not null primary key,
val bigint) tblproperties ("transactional"="true");
-- 2. 建立一個Stream關聯Delta Table表。
CREATE STREAM delta_table_stream on table delta_table_src
version as of 1 strmproperties('read_mode'='append')
comment 'stream demo';
-- 3. 查看建立的stream資訊,當前Offset Version和Reference Table Version都為1。
DESC STREAM delta_table_stream;
-- 輸出結果。
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:26:56
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
-- 4. stream關聯的Delet Table插入一條資料,目的是為了使得Delta Table的lsn遞增,
-- 之後我們將引用Delta Table的version修改為遞增後的version.
INSERT INTO delta_table_src VALUES ('1', '1');
-- 5. 查看當前Delta Table的資料版本資訊
SHOW history FOR TABLE delta_table_src;
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000001 2024-09-07 10:25:59 CREATE
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000002 2024-09-07 10:28:19 APPEND
-- 6. 修改stream關聯的Delta Table的version為2
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;
-- 7. 查看修改後的stream資訊,stream以及關聯的Delta Table的version都變成了2。
DESC STREAM delta_table_stream;
-- 輸出結果。
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:29:12
Offset Version 2
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 2
Parameters {
"comment": "stream demo",
"read_mode": "append"}列出專案下的所有Stream
文法
SHOW STREAMS;樣本
-- 列出當前專案下的所有stream對象
SHOW STREAMS;
-- 輸出結果。
delta_table_stream刪除Stream
文法
DROP STREAM [IF EXISTS] <stream_name>;樣本
-- 1. 查看當前專案中存在的所有stream對象
SHOW STREAMS;
-- 輸出顯示。
delta_table_stream
-- 2. 刪除delta_table_stream stream對象
DROP STREAM IF EXISTS delta_table_stream;
-- 3. 再次查看當前專案中存在的所有stream對象;結果為空白
SHOW STREAMS;查詢Stream
文法
SELECT * FROM <stream_name>;查詢Stream時,單純執行DQL,並不會改變Stream的狀態,即Stream的增量起始的Offset Version不會改變,但其關聯的Delta Table的Reference Table Version會隨著Delta Table狀態的改變而改變,保持為關聯Delta Table最新的Version資料版本。單純執行DQL,表示這份增量資料沒有被真正消費,只是進行了資料的探查。
查詢Stream並且執行了DML操作,表示真正消費了Stream所表示的增量資料,會修改Stream的狀態,將關聯的資料版本遷移到這次DML操作查詢的最新增量資料版本,即Stream的Offset Version會等於關聯Delta Table的Reference Table Version,此時表示沒有新的增量資料,在目前狀態下如果Stream再被讀取,讀取的資料為空白。
CDC模式查詢輸出樣本
Delta Table CDC模式的使用,詳情請參見CDC(邀測)。
建立Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ( "transactional"="true", 'acid.cdc.mode.enable'='true', 'cdc.insert.into.passthrough.enable'='true' );建立目標表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");建立CDC模式的Stream。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties('read_mode'='cdc') comment 'stream cdc mode';插入兩條資料至Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);查詢
delta_table_stream輸出CDC格式資料。但執行單純的DQL,並不會改變delta_table_stream狀態,如下語句執行多次,返回結果一樣。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 2 | 2 | 2024-09-07 11:03:53 | 1 | 0 | | 1 | 1 | 2024-09-07 11:03:53 | 1 | 0 | +------------+------------+------------------+----------------+------------------+讀取
delta_table_stream表的增量資料,並插入到目標表delta_table_dest。同時將delta_table_stream的Offset Version修改為關聯表delta_table_src最新的資料版本。此操作真正消費了delta_table_stream表的增量資料。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;查詢目標表。表中儲存的是步驟6操作消費的Stream表的增量資料。
SELECT * FROM delta_table_dest; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+重新查詢
delta_table_stream表,輸出為空白。由於delta_table_stream表示的增量資料已經被消費,沒有新的增量資料。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+執行Update操作將源表pk為1記錄的val設為10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;由於源表由Update操作產生的新的增量資料,查詢
delta_table_stream,可輸出Update操作的CDC資料。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 1 | 1 | 2024-09-07 11:10:21 | 0 | 1 | | 1 | 10 | 2024-09-07 11:10:21 | 1 | 1 | +------------+------------+------------------+----------------+------------------+
由上述樣本可見,CDC輸出模式會跟蹤Delta Table源表的記錄變化,輸出所有變化狀態的記錄,可有效用於增量計算的邏輯。
Append模式查詢輸出樣本
建立Delta Table源表。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");建立目標表。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");建立Append模式的Stream,並關聯Delta Table。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties ('read_mode'='append') comment 'stream append mode';插入兩條資料到Delta Table源表。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);查詢
delta_table_stream。SELECT * FROM delta_table_stream; -- 輸出結果,不包含系統欄位。 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+讀取
delta_table_stream表的增量資料,並插入到目標表,同時將delta_table_stream的Offset Version修改為關聯表delta_table_src最新的資料版本,此操作真正消費delta_table_stream表的增量資料。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;查詢目標表。表中儲存的是步驟6 操作消費的
delta_table_stream表的增量資料。SELECT * FROM delta_table_dest; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+重新查詢
delta_table_stream表,輸出為空白。由於delta_table_stream表示的增量資料已經被消費,沒有新的增量資料。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+執行Update操作將源表pk為1的記錄val設為10。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;執行刪除操作,將源表中pk為2的記錄刪除。
DELETE FROM delta_table_src WHERE pk = 2;查詢
delta_table_stream,只輸出了Update操作的結果記錄 (1, 10),Delete操作的記錄不可見,也不會輸出。SELECT * FROM delta_table_stream; -- 輸出結果 +------------+------------+ | pk | val | +------------+------------+ | 1 | 10 | +------------+------------+
由上述樣本可見,Append輸出模式並不會顯示資料的操作狀態,只會輸出一條記錄的最終狀態,Delete記錄也不會輸出。因此使用的情境有限,通常可用於一些典型的ETL情境,不斷對增量插入的資料進行清洗。