ストリームは、Delta テーブルに対する増分クエリのデータバージョンを自動的に管理する MaxCompute オブジェクトです。挿入、更新、削除などのデータ操作言語 (DML) の変更を、各変更のメタデータとともに記録します。これにより、変更されたデータを使用して操作を実行できます。このトピックでは、ストリーム操作に使用されるコマンドについて説明します。
ストリームの作成
構文
CREATE STREAM [IF NOT EXISTS] <stream_name>
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc")
[comment <stream_comment>];名前 | 説明 |
IF NOT EXISTS | オプションです。 |
stream_name | 必須。作成するストリームの名前。 |
ON TABLE <delta_table_name> | ストリームに関連付けるソース Delta テーブルを指定します。ストリームは 1 つのソーステーブルにのみ関連付けることができます。 |
timestamp as of t | ストリームの VersionOffset をデータタイムスタンプ t に初期化します。クエリ範囲は |
version as of v | ストリームの VersionOffset をデータバージョン v に初期化します。クエリ範囲は |
strmproperties | テーブルプロパティと同様に、ストリームプロパティを文字列型のキーと値のペアとして指定します。現在、read_mode プロパティのみがサポートされています。有効な値は |
stream_comment | オプションです。ストリームのコメント。コメントは 1024 バイト以下の有効な文字列でなければなりません。そうでない場合、エラーが返されます。 |
システム列 |
|
例
ソース Delta テーブルを作成し、次に Delta テーブルに関連付けられたストリームを作成します。
CREATE TABLE delta_table_src (
pk bigint NOT NULL PRIMARY KEY,
val bigint
) tblproperties ("transactional"="true");
CREATE STREAM delta_table_stream
ON TABLE delta_table_src version as of 1
strmproperties('read_mode'='append')
comment 'ストリームデモ';ストリーム情報の表示
構文
DESC STREAM <stream_name>;例
ソース Delta テーブルとそれに関連付けられたストリームを作成します。次に、ストリームオブジェクト delta_table_stream の情報を表示します。
CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key,
val BIGINT) TBLPROPERTIES ("transactional"="true");
CREATE STREAM delta_table_stream ON TABLE delta_table_src
version AS OF 1 strmproperties('read_mode'='append')
comment 'ストリームデモ';
DESC STREAM delta_table_stream;出力
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-06 17:03:32 Last Modified Time 2024-09-06 17:03:32
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 5e19a67eb97b4477b7fbce0c7bbcebca
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}名前 | 説明 |
Name | ストリームの名前。 |
Project | ストリームが存在するプロジェクトの名前。 |
Create Time | ストリームが作成された時刻。 |
Offset Version | ストリームの初期データバージョン。 |
Reference Table Project | 関連付けられたソーステーブルが存在するプロジェクトの名前。 |
Reference Table Name | 関連付けられたソーステーブルの名前。 |
Reference Table Id | 関連付けられたソーステーブルの一意の ID。 |
Reference Table Version | 関連付けられたソーステーブルのデータバージョン。 |
Parameters | ストリームオブジェクトのプロパティ。 |
Offset Version と Reference Table Version の情報には特に注意してください。Offset Version は、現在のストリームによって消費された関連 Delta テーブルのデータバージョンを示します。Reference Table Version は、関連 Delta テーブルの最新のデータバージョンを示します。関連 Delta テーブルは空であるため、両方の値は 1 です。Stream オブジェクトが作成された後、その関連 Delta テーブルで DML 操作が実行されると、Reference Table Version の値はそれに応じて更新されます。ストリームの読み取りは、関連テーブルに対する増分クエリに変換されます。クエリのデータバージョン範囲は、左開右閉区間 (Offset Version, Reference Table Version] です。これにより、Offset Version と Reference Table Version の間の増分データが読み取られることが保証されます。(Offset Version, Reference Table Version] バージョン範囲の増分データが DML 操作によって消費されると、Offset Version は Reference Table Version と等しくなります。この時点で、両方のバージョンは関連 Delta テーブルの最新のデータバージョンを反映しており、新しい増分データがないことを示します。
ストリームの変更
ストリームプロパティの変更
ALTER STREAM <stream_name> SET strmproperties ("key"="value");stream_name: 必須。変更するストリームの名前。
strmproperties: ストリームのプロパティは、テーブルプロパティと同様に、文字列型のキーと値のペアとして指定されます。現在、
read_modeプロパティのみがサポートされており、変更することはできません。
ストリームの初期データバージョンの変更
ALTER STREAM <stream_name> ON TABLE <delta_table_name>
<timestamp as of t | version as of v > ;stream_name: 必須。変更するストリームの名前。
ON TABLE <delta_table_name>: ストリームに関連付けられたソース Delta テーブルを指定します。これは元のソーステーブルと同じでなければなりません。ソーステーブルの変更は現在サポートされていません。
timestamp as of t: ストリームの初期データバージョン (VersionOffset) をタイムスタンプ t に変更します。クエリ範囲は
(t, 最新の増分データバージョン]です。version as of v: ストリームの初期データバージョン (VersionOffset) をバージョン v に変更します。クエリ範囲は
(v, 最新の増分データバージョン]です。
例
-- 1. ソース Delta テーブルを作成します。
CREATE TABLE delta_table_src (pk bigint not null primary key,
val bigint) tblproperties ("transactional"="true");
-- 2. Delta テーブルに関連付けられたストリームを作成します。
CREATE STREAM delta_table_stream on table delta_table_src
version as of 1 strmproperties('read_mode'='append')
comment 'ストリームデモ';
-- 3. 新しいストリームの情報を表示します。現在の Offset Version と Reference Table Version は両方とも 1 です。
DESC STREAM delta_table_stream;
-- 出力:
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:26:56
Offset Version 1
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 1
Parameters {
"comment": "stream demo",
"read_mode": "append"}
-- 4. 関連付けられた Delta テーブルにレコードを挿入して LSN をインクリメントします。
-- その後、Delta テーブルの参照バージョンを新しい、インクリメントされたバージョンに変更します。
INSERT INTO delta_table_src VALUES ('1', '1');
-- 5. Delta テーブルの現在のデータバージョン情報を表示します。
SHOW history FOR TABLE delta_table_src;
ObjectType ObjectId ObjectName VERSION(LSN) Time Operation
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000001 2024-09-07 10:25:59 CREATE
TABLE 8605276ce0034b20af761bf4761ba62e delta_table_src 0000000000000002 2024-09-07 10:28:19 APPEND
-- 6. ストリームに関連付けられた Delta テーブルのバージョンを 2 に変更します。
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;
-- 7. 変更されたストリームの情報を表示します。ストリームと関連付けられた Delta テーブルの両方のバージョンが 2 になります。
DESC STREAM delta_table_stream;
-- 出力:
Name delta_table_stream
Project sql_optimizer
Create Time 2024-09-07 10:26:56 Last Modified Time 2024-09-07 10:29:12
Offset Version 2
Reference Table Project sql_optimizer
Reference Table Name delta_table_src
Reference Table Id 8605276ce0034b20af761bf4761ba62e
Reference Table Version 2
Parameters {
"comment": "stream demo",
"read_mode": "append"}プロジェクト内のすべてのストリームを一覧表示
構文
SHOW STREAMS;例
-- 現在のプロジェクト内のすべてのストリームオブジェクトを一覧表示します。
SHOW STREAMS;
-- 出力:
delta_table_streamストリームの削除
構文
DROP STREAM [IF EXISTS] <stream_name>;例
-- 1. 現在のプロジェクトに存在するすべてのストリームオブジェクトを表示します。
SHOW STREAMS;
-- 出力:
delta_table_stream
-- 2. delta_table_stream オブジェクトを削除します。
DROP STREAM IF EXISTS delta_table_stream;
-- 3. プロジェクト内のストリームオブジェクトを再度表示します。結果は空です。
SHOW STREAMS;ストリームのクエリ
構文
SELECT * FROM <stream_name>;ストリームをクエリするとき、データクエリ言語 (DQL) 文のみを実行しても、ストリームのステータスは変更されません。ストリームの開始 Offset Version は変更されません。ただし、関連する Delta テーブルの Reference Table Version は、Delta テーブルのステータスとともに変化し、常に最新のデータバージョンを反映します。DQL 文のみを実行するということは、増分データが検査されるだけで消費されないことを意味します。
ストリームをクエリして DML 操作を実行すると、ストリームによって表される増分データが消費されます。これにより、ストリームのステータスが変更されます。関連するデータバージョンは、DML 操作でクエリされた最新の増分データバージョンに進められます。これは、ストリームの Offset Version が関連する Delta テーブルの Reference Table Version と等しくなることを意味します。この時点で、新しい増分データはありません。現在の状態でストリームを再度読み取ると、結果は空になります。
CDC モードでのクエリの出力例
Delta テーブルで変更データキャプチャ (CDC) モードを使用する方法の詳細については、「CDC (招待プレビュー)」をご参照ください。
ソース Delta テーブルを作成します。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ( "transactional"="true", 'acid.cdc.mode.enable'='true', 'cdc.insert.into.passthrough.enable'='true' );宛先テーブルを作成します。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");CDC モードでストリームを作成します。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties('read_mode'='cdc') comment 'Stream cdc mode';ソース Delta テーブルに 2 つのレコードを挿入します。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);delta_table_streamをクエリして CDC フォーマットでデータを出力します。DQL 文のみを実行してもdelta_table_streamのステータスは変更されません。以下の文を複数回実行すると、同じ結果が得られます。SELECT * FROM delta_table_stream; -- 出力 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 2 | 2 | 2024-09-07 11:03:53 | 1 | 0 | | 1 | 1 | 2024-09-07 11:03:53 | 1 | 0 | +------------+------------+------------------+----------------+------------------+delta_table_streamテーブルから増分データを読み取り、宛先テーブルdelta_table_destに挿入します。この操作は、delta_table_streamの Offset Version を、関連するdelta_table_srcテーブルの最新のデータバージョンに更新し、delta_table_streamテーブルから増分データを消費します。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;宛先テーブルをクエリします。テーブルには、ステップ 6 で消費されたストリームからの増分データが含まれています。
SELECT * FROM delta_table_dest; -- 出力 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+delta_table_streamテーブルを再度クエリします。クエリは空の結果を返します。これは、delta_table_streamによって表される増分データが消費され、新しい増分データが存在しないためです。SELECT * FROM delta_table_stream; -- 出力 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+UPDATE 操作を実行して、ソーステーブルの pk が 1 のレコードの val の値を 10 に設定します。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;UPDATE 操作により、ソーステーブルに新しい増分データが生成されました。これで
delta_table_streamをクエリして、更新の CDC データを出力できます。SELECT * FROM delta_table_stream; -- 出力 +------------+------------+------------------+----------------+------------------+ | pk | val | __meta_timestamp | __meta_op_type | __meta_is_update | +------------+------------+------------------+----------------+------------------+ | 1 | 1 | 2024-09-07 11:10:21 | 0 | 1 | | 1 | 10 | 2024-09-07 11:10:21 | 1 | 1 | +------------+------------+------------------+----------------+------------------+
この例は、CDC 出力モードがソース Delta テーブルのレコード変更を追跡し、すべての変更状態のレコードを出力することを示しており、これは増分計算ロジックに効果的です。
append モードでのクエリの出力例
ソース Delta テーブルを作成します。
CREATE TABLE delta_table_src ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");宛先テーブルを作成します。
CREATE TABLE delta_table_dest ( pk bigint NOT NULL PRIMARY KEY, val bigint ) tblproperties ("transactional"="true");append モードでストリームを作成し、Delta テーブルに関連付けます。
CREATE STREAM delta_table_stream ON TABLE delta_table_src version as of 1 strmproperties ('read_mode'='append') comment 'Stream append mode';ソース Delta テーブルに 2 つのレコードを挿入します。
INSERT INTO delta_table_src VALUES (1, 1), (2, 2);delta_table_streamをクエリします。SELECT * FROM delta_table_stream; -- 出力にはシステム列は含まれません。 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+増分データは
delta_table_streamテーブルから読み取られ、宛先テーブルに挿入されます。これにより、delta_table_streamの Offset Version も、関連するdelta_table_srcテーブルの最新のデータバージョンに更新されます。この操作は、delta_table_streamテーブルから増分データを消費します。INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;宛先テーブルをクエリします。テーブルには、ステップ 6 で消費された
delta_table_streamテーブルからの増分データが含まれています。SELECT * FROM delta_table_dest; -- 出力 +------------+------------+ | pk | val | +------------+------------+ | 1 | 1 | | 2 | 2 | +------------+------------+delta_table_streamテーブルを再度クエリします。クエリは空の結果を返します。これは、delta_table_streamによって表される増分データが消費され、新しい増分データが存在しないためです。SELECT * FROM delta_table_stream; -- 出力 +------------+------------+ | pk | val | +------------+------------+ +------------+------------+UPDATE 操作を実行して、ソーステーブルの pk が 1 のレコードの val の値を 10 に設定します。
UPDATE delta_table_src SET val = 10 WHERE pk = 1;DELETE 操作を実行して、ソーステーブルから pk が 2 のレコードを削除します。
DELETE FROM delta_table_src WHERE pk = 2;delta_table_streamをクエリします。UPDATE 操作の結果であるレコード (1, 10) のみが返されます。DELETE 操作の影響を受けたレコードは返されません。SELECT * FROM delta_table_stream; -- 出力 +------------+------------+ | pk | val | +------------+------------+ | 1 | 10 | +------------+------------+
この例は、append 出力モードがデータの操作ステータスを表示しないことを示しています。レコードの最終状態のみを出力し、削除されたレコードは返されません。したがって、そのユースケースは限られています。通常、一般的な抽出・変換・書き出し (ETL) シナリオで、増分的に挿入されたデータを継続的に処理するために使用されます。