すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:ストリームオブジェクト

最終更新日:Nov 09, 2025

ストリームは、Delta テーブルに対する増分クエリのデータバージョンを自動的に管理する MaxCompute オブジェクトです。挿入、更新、削除などのデータ操作言語 (DML) の変更を、各変更のメタデータとともに記録します。これにより、変更されたデータを使用して操作を実行できます。このトピックでは、ストリーム操作に使用されるコマンドについて説明します。

ストリームの作成

構文

CREATE STREAM [IF NOT EXISTS] <stream_name> 
ON TABLE <delta_table_name> <timestamp as of t | VERSION as of v>
strmproperties ("read_mode"="append" | "cdc") 
[comment <stream_comment>];

名前

説明

IF NOT EXISTS

オプションです。IF NOT EXISTS を指定せず、同じ名前のストリームが存在する場合、エラーが返されます。IF NOT EXISTS を指定した場合、同じ名前のストリームが存在し、構造が異なっていても、文は成功メッセージを返します。既存のストリームのメタデータは変更されません。

stream_name

必須。作成するストリームの名前。

ON TABLE <delta_table_name>

ストリームに関連付けるソース Delta テーブルを指定します。ストリームは 1 つのソーステーブルにのみ関連付けることができます。

timestamp as of t

ストリームの VersionOffset をデータタイムスタンプ t に初期化します。クエリ範囲は (t, 最新の増分データタイムスタンプ] です。

version as of v

ストリームの VersionOffset をデータバージョン v に初期化します。クエリ範囲は (v, 最新の増分データバージョン] です。

strmproperties

テーブルプロパティと同様に、ストリームプロパティを文字列型のキーと値のペアとして指定します。現在、read_mode プロパティのみがサポートされています。有効な値は appendcdc です。append モードは Delta テーブルからデータを消費するために使用されます。cdc モードは Delta テーブルから変更データキャプチャ (CDC) データを消費するために使用されます。

stream_comment

オプションです。ストリームのコメント。コメントは 1024 バイト以下の有効な文字列でなければなりません。そうでない場合、エラーが返されます。

システム列

"read_mode" = "cdc" の場合、3 つの追加のシステム列が出力されます。__meta_timestamp はデータが書き込まれた時刻を表し、__meta_op_type (INSERT | DELETE を含む)、および __meta_is_update (TRUE | FALSE を含む) です。これらを組み合わせることで、4 つのシナリオを表すことができます。INSERT + FALSE は新しいレコードを表し、INSERT + TRUE は更新後の値を表し、DELETE + TRUE は更新前の値を表し、DELETE + FALSE は削除を表します。ただし、__meta_op_type および __meta_is_update システム列の値は tinyint 型です。対応する組み合わせの値とその意味は次のとおりです。

+--------------------+--------------+-----------------+
|     Operation      |__meta_op_type|__meta_is_update |
+--------------------+--------------+-----------------+
|       INSERT       |    INSERT(1) |      FALSE(0)   |
|       DELETE       |    DELETE(0) |      FALSE(0)   |
|   UPDATE_BEFORE    |    DELETE(0) |      TRUE(1)    |
|   UPDATE_AFTER     |    INSERT(1) |      TRUE(1)    |
+--------------------+--------------+-----------------+

ソース Delta テーブルを作成し、次に Delta テーブルに関連付けられたストリームを作成します。

CREATE TABLE delta_table_src (
  pk bigint NOT NULL PRIMARY KEY, 
  val bigint
) tblproperties ("transactional"="true");

CREATE STREAM delta_table_stream 
ON TABLE delta_table_src version as of 1 
strmproperties('read_mode'='append') 
comment 'ストリームデモ';

ストリーム情報の表示

構文

DESC STREAM <stream_name>;

ソース Delta テーブルとそれに関連付けられたストリームを作成します。次に、ストリームオブジェクト delta_table_stream の情報を表示します。

CREATE TABLE delta_table_src (pk BIGINT NOT NULL PRIMARY key, 
val BIGINT) TBLPROPERTIES ("transactional"="true");

CREATE STREAM delta_table_stream ON TABLE delta_table_src 
version AS OF 1 strmproperties('read_mode'='append') 
comment 'ストリームデモ';

DESC STREAM delta_table_stream;

出力

Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-06 17:03:32                     Last Modified Time                      2024-09-06 17:03:32                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      5e19a67eb97b4477b7fbce0c7bbcebca        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

名前

説明

Name

ストリームの名前。

Project

ストリームが存在するプロジェクトの名前。

Create Time

ストリームが作成された時刻。

Offset Version

ストリームの初期データバージョン。

Reference Table Project

関連付けられたソーステーブルが存在するプロジェクトの名前。

Reference Table Name

関連付けられたソーステーブルの名前。

Reference Table Id

関連付けられたソーステーブルの一意の ID。

Reference Table Version

関連付けられたソーステーブルのデータバージョン。

Parameters

ストリームオブジェクトのプロパティ。

説明

Offset VersionReference Table Version の情報には特に注意してください。Offset Version は、現在のストリームによって消費された関連 Delta テーブルのデータバージョンを示します。Reference Table Version は、関連 Delta テーブルの最新のデータバージョンを示します。関連 Delta テーブルは空であるため、両方の値は 1 です。Stream オブジェクトが作成された後、その関連 Delta テーブルで DML 操作が実行されると、Reference Table Version の値はそれに応じて更新されます。ストリームの読み取りは、関連テーブルに対する増分クエリに変換されます。クエリのデータバージョン範囲は、左開右閉区間 (Offset Version, Reference Table Version] です。これにより、Offset Version と Reference Table Version の間の増分データが読み取られることが保証されます。(Offset Version, Reference Table Version] バージョン範囲の増分データが DML 操作によって消費されると、Offset Version は Reference Table Version と等しくなります。この時点で、両方のバージョンは関連 Delta テーブルの最新のデータバージョンを反映しており、新しい増分データがないことを示します。

ストリームの変更

ストリームプロパティの変更

ALTER STREAM <stream_name> SET strmproperties ("key"="value");
説明
  • stream_name: 必須。変更するストリームの名前。

  • strmproperties: ストリームのプロパティは、テーブルプロパティと同様に、文字列型のキーと値のペアとして指定されます。現在、read_mode プロパティのみがサポートされており、変更することはできません。

ストリームの初期データバージョンの変更

ALTER STREAM <stream_name> ON TABLE <delta_table_name> 
<timestamp as of t  |  version as of v > ;
説明
  • stream_name: 必須。変更するストリームの名前。

  • ON TABLE <delta_table_name>: ストリームに関連付けられたソース Delta テーブルを指定します。これは元のソーステーブルと同じでなければなりません。ソーステーブルの変更は現在サポートされていません。

  • timestamp as of t: ストリームの初期データバージョン (VersionOffset) をタイムスタンプ t に変更します。クエリ範囲は (t, 最新の増分データバージョン] です。

  • version as of v: ストリームの初期データバージョン (VersionOffset) をバージョン v に変更します。クエリ範囲は (v, 最新の増分データバージョン] です。

-- 1. ソース Delta テーブルを作成します。
CREATE TABLE delta_table_src (pk bigint not null primary key, 
val bigint) tblproperties ("transactional"="true");

-- 2. Delta テーブルに関連付けられたストリームを作成します。
CREATE STREAM delta_table_stream on table delta_table_src 
version as of 1 strmproperties('read_mode'='append') 
comment 'ストリームデモ';

-- 3. 新しいストリームの情報を表示します。現在の Offset Version と Reference Table Version は両方とも 1 です。
DESC STREAM delta_table_stream;
-- 出力:
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:26:56                     
Offset Version                          1                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 1                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

-- 4. 関連付けられた Delta テーブルにレコードを挿入して LSN をインクリメントします。
-- その後、Delta テーブルの参照バージョンを新しい、インクリメントされたバージョンに変更します。
INSERT INTO delta_table_src VALUES ('1', '1');

-- 5. Delta テーブルの現在のデータバージョン情報を表示します。
SHOW history FOR TABLE delta_table_src;

ObjectType	ObjectId                        	ObjectName          	VERSION(LSN)    	Time               	Operation       
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000001	2024-09-07 10:25:59	CREATE          
TABLE     	8605276ce0034b20af761bf4761ba62e	delta_table_src     	0000000000000002	2024-09-07 10:28:19	APPEND      

-- 6. ストリームに関連付けられた Delta テーブルのバージョンを 2 に変更します。
ALTER STREAM delta_table_stream ON TABLE delta_table_src version as of 2;

-- 7. 変更されたストリームの情報を表示します。ストリームと関連付けられた Delta テーブルの両方のバージョンが 2 になります。
DESC STREAM delta_table_stream;

-- 出力:
Name                                    delta_table_stream                      
Project                                 sql_optimizer                           
Create Time                             2024-09-07 10:26:56                     Last Modified Time                      2024-09-07 10:29:12                     
Offset Version                          2                                       
Reference Table Project                 sql_optimizer                           
Reference Table Name                    delta_table_src                         
Reference Table Id                      8605276ce0034b20af761bf4761ba62e        
Reference Table Version                 2                                       
Parameters                              {
    "comment": "stream demo",
    "read_mode": "append"}

プロジェクト内のすべてのストリームを一覧表示

構文

SHOW STREAMS;

-- 現在のプロジェクト内のすべてのストリームオブジェクトを一覧表示します。
SHOW STREAMS;
-- 出力:
delta_table_stream

ストリームの削除

構文

DROP STREAM [IF EXISTS] <stream_name>;

-- 1. 現在のプロジェクトに存在するすべてのストリームオブジェクトを表示します。
SHOW STREAMS;
-- 出力:
delta_table_stream

-- 2. delta_table_stream オブジェクトを削除します。
DROP STREAM IF EXISTS delta_table_stream;

-- 3. プロジェクト内のストリームオブジェクトを再度表示します。結果は空です。
SHOW STREAMS;

ストリームのクエリ

構文

SELECT * FROM <stream_name>;
説明
  • ストリームをクエリするとき、データクエリ言語 (DQL) 文のみを実行しても、ストリームのステータスは変更されません。ストリームの開始 Offset Version は変更されません。ただし、関連する Delta テーブルの Reference Table Version は、Delta テーブルのステータスとともに変化し、常に最新のデータバージョンを反映します。DQL 文のみを実行するということは、増分データが検査されるだけで消費されないことを意味します。

  • ストリームをクエリして DML 操作を実行すると、ストリームによって表される増分データが消費されます。これにより、ストリームのステータスが変更されます。関連するデータバージョンは、DML 操作でクエリされた最新の増分データバージョンに進められます。これは、ストリームの Offset Version が関連する Delta テーブルの Reference Table Version と等しくなることを意味します。この時点で、新しい増分データはありません。現在の状態でストリームを再度読み取ると、結果は空になります。

CDC モードでのクエリの出力例

Delta テーブルで変更データキャプチャ (CDC) モードを使用する方法の詳細については、「CDC (招待プレビュー)」をご参照ください。

  1. ソース Delta テーブルを作成します。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties (
      "transactional"="true", 
      'acid.cdc.mode.enable'='true', 
      'cdc.insert.into.passthrough.enable'='true'
    );
  2. 宛先テーブルを作成します。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. CDC モードでストリームを作成します。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties('read_mode'='cdc') 
    comment 'Stream cdc mode';
  4. ソース Delta テーブルに 2 つのレコードを挿入します。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. delta_table_stream をクエリして CDC フォーマットでデータを出力します。DQL 文のみを実行しても delta_table_stream のステータスは変更されません。以下の文を複数回実行すると、同じ結果が得られます。

    SELECT * FROM  delta_table_stream;
    
    -- 出力
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 2          | 2          | 2024-09-07 11:03:53 | 1             | 0                |
    | 1          | 1          | 2024-09-07 11:03:53 | 1              | 0                |
    +------------+------------+------------------+----------------+------------------+
  6. delta_table_stream テーブルから増分データを読み取り、宛先テーブル delta_table_dest に挿入します。この操作は、delta_table_stream の Offset Version を、関連する delta_table_src テーブルの最新のデータバージョンに更新し、delta_table_stream テーブルから増分データを消費します。

    INSERT INTO  delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 宛先テーブルをクエリします。テーブルには、ステップ 6 で消費されたストリームからの増分データが含まれています。

    SELECT * FROM delta_table_dest; 
    
    -- 出力
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. delta_table_stream テーブルを再度クエリします。クエリは空の結果を返します。これは、delta_table_stream によって表される増分データが消費され、新しい増分データが存在しないためです。

    SELECT * FROM delta_table_stream;
    
    -- 出力
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. UPDATE 操作を実行して、ソーステーブルの pk が 1 のレコードの val の値を 10 に設定します。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. UPDATE 操作により、ソーステーブルに新しい増分データが生成されました。これで delta_table_stream をクエリして、更新の CDC データを出力できます。

    SELECT * FROM delta_table_stream;  
    
    --  出力
    +------------+------------+------------------+----------------+------------------+
    | pk         | val        | __meta_timestamp | __meta_op_type | __meta_is_update |
    +------------+------------+------------------+----------------+------------------+
    | 1          | 1          | 2024-09-07 11:10:21 | 0              | 1                |
    | 1          | 10         | 2024-09-07 11:10:21 | 1              | 1                |
    +------------+------------+------------------+----------------+------------------+

この例は、CDC 出力モードがソース Delta テーブルのレコード変更を追跡し、すべての変更状態のレコードを出力することを示しており、これは増分計算ロジックに効果的です。

append モードでのクエリの出力例

  1. ソース Delta テーブルを作成します。

    CREATE TABLE delta_table_src (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  2. 宛先テーブルを作成します。

    CREATE TABLE delta_table_dest (
      pk bigint NOT NULL PRIMARY KEY, 
      val bigint
    ) tblproperties ("transactional"="true");
  3. append モードでストリームを作成し、Delta テーブルに関連付けます。

    CREATE STREAM delta_table_stream 
    ON TABLE delta_table_src version as of 1 
    strmproperties ('read_mode'='append') 
    comment 'Stream append mode';
  4. ソース Delta テーブルに 2 つのレコードを挿入します。

    INSERT INTO delta_table_src VALUES (1, 1), (2, 2);
  5. delta_table_stream をクエリします。

    SELECT * FROM delta_table_stream; 
    
    -- 出力にはシステム列は含まれません。
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  6. 増分データは delta_table_stream テーブルから読み取られ、宛先テーブルに挿入されます。これにより、delta_table_stream の Offset Version も、関連する delta_table_src テーブルの最新のデータバージョンに更新されます。この操作は、delta_table_stream テーブルから増分データを消費します。

    INSERT INTO delta_table_dest SELECT pk, val FROM delta_table_stream;
  7. 宛先テーブルをクエリします。テーブルには、ステップ 6 で消費された delta_table_stream テーブルからの増分データが含まれています。

    SELECT * FROM delta_table_dest; 
    
    -- 出力
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 1          |
    | 2          | 2          |
    +------------+------------+
  8. delta_table_stream テーブルを再度クエリします。クエリは空の結果を返します。これは、delta_table_stream によって表される増分データが消費され、新しい増分データが存在しないためです。

    SELECT * FROM delta_table_stream; 
    
    -- 出力
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    +------------+------------+
  9. UPDATE 操作を実行して、ソーステーブルの pk が 1 のレコードの val の値を 10 に設定します。

    UPDATE delta_table_src SET val = 10 WHERE pk = 1;
  10. DELETE 操作を実行して、ソーステーブルから pk が 2 のレコードを削除します。

    DELETE FROM delta_table_src WHERE pk = 2;
  11. delta_table_stream をクエリします。UPDATE 操作の結果であるレコード (1, 10) のみが返されます。DELETE 操作の影響を受けたレコードは返されません。

    SELECT * FROM delta_table_stream; 
    
    -- 出力
    +------------+------------+
    | pk         | val        |
    +------------+------------+
    | 1          | 10         |
    +------------+------------+

この例は、append 出力モードがデータの操作ステータスを表示しないことを示しています。レコードの最終状態のみを出力し、削除されたレコードは返されません。したがって、そのユースケースは限られています。通常、一般的な抽出・変換・書き出し (ETL) シナリオで、増分的に挿入されたデータを継続的に処理するために使用されます。