このトピックでは、Realtime Compute for Apache Flink で TUMBLE 関数を使用する方法について説明します。
説明
TUMBLE 関数は、各要素を特定のサイズのタンブリングウィンドウに割り当てます。ほとんどの場合、タンブリングウィンドウはサイズが固定されており、互いに重複しません。たとえば、5 分のタンブリングウィンドウが定義されている場合、無限のデータストリームは、[0:00, 0:05)
、[0:05, 0:10)
、[0:10, 0:15)
などの期間に基づいてウィンドウに分割されます。
構文
ウィンドウ識別子関数
ウィンドウ識別子関数は、ウィンドウの開始時刻、終了時刻、または時間属性を指定します。時間属性は、下位レベルのウィンドウを集計するために使用されます。
関数 | 戻り値の型 | 説明 |
| TIMESTAMP | 境界値を含む、ウィンドウの開始時刻を返します。たとえば、ウィンドウの時間範囲が |
| TIMESTAMP | 境界値を含む、ウィンドウの終了時刻を返します。たとえば、ウィンドウの時間範囲が |
| TIMESTAMP(rowtime-attr) | 境界値を除く、ウィンドウの終了時刻を返します。たとえば、ウィンドウの時間範囲が |
| TIMESTAMP(rowtime-attr) | 境界値を除く、ウィンドウの終了時刻を返します。たとえば、ウィンドウの時間範囲が |
例 1:イベント時間に基づいて、特定の Web サイトのユーザーごとの 1 分あたりのクリック数をカウントする
テストデータ
username (VARCHAR)
click_url (VARCHAR)
eventtime (VARCHAR)
Jark
http://taobao.com/xxx
2024-08-10 10:00:00.0
Jark
http://taobao.com/xxx
2024-08-10 10:00:10.0
Jark
http://taobao.com/xxx
2024-08-10 10:00:49.0
Jark
http://taobao.com/xxx
2024-08-10 10:01:05.0
Jark
http://taobao.com/xxx
2024-08-10 10:01:58.0
Timo
http://taobao.com/xxx
2024-08-10 10:02:10.0
Timo
http://taobao.com/xxx
2024-08-10 10:03:10.0
テストステートメント
CREATE TEMPORARY TABLE user_clicks( username varchar, click_url varchar, eventtime varchar, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- 行時間のウォーターマークを定義します。 ) WITH ( 'connector'='sls', ... ); CREATE TEMPORARY TABLE tumble_output( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='datahub' ... ); INSERT INTO tumble_output SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start, TUMBLE_END(ts, INTERVAL '1' MINUTE) as window_end, username, COUNT(click_url) FROM user_clicks GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;
テスト結果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-08-10 10:00:00.0
2024-08-10 10:01:00.0
Jark
3
2024-08-10 10:01:00.0
2024-08-10 10:02:00.0
Jark
2
2024-08-10 10:02:00.0
2024-08-10 10:03:00.0
Timo
1
例 2:処理時間に基づいて、特定の Web サイトのユーザーごとの 1 分あたりのクリック数をカウントする
テストデータ
username (VARCHAR)
click_url (VARCHAR)
Jark
http://taobao.com/xxx
Jark
http://taobao.com/xxx
Jark
http://taobao.com/xxx
Jark
http://taobao.com/xxx
Jark
http://taobao.com/xxx
Timo
http://taobao.com/xxx
テストステートメント
CREATE TEMPORARY TABLE window_test ( username VARCHAR, click_url VARCHAR, ts as PROCTIME() -- 処理時間を定義します。 ) WITH ( 'connector'='sls', ... ); CREATE TEMPORARY TABLE tumble_output( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='datahub' -- Log Service では VARCHAR 型の DDL ステートメントのみエクスポートできるため、DataHub をストレージに使用します。 ... ); INSERT INTO tumble_output SELECT TUMBLE_START(ts, INTERVAL '1' MINUTE), TUMBLE_END(ts, INTERVAL '1' MINUTE), username, COUNT(click_url) FROM window_test GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), username;
テスト結果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-08-10 14:43:00.000
2024-08-10 14:44:00.000
Jark
5
2024-08-10 14:43:00.000
2024-08-10 14:44:00.000
Timo
1