このトピックでは、Realtime Compute for Apache Flink で HOP 関数を使用する方法について説明します。
定義
HOP 関数は、ホッピングウィンドウ(スライディングウィンドウとも呼ばれます)を定義するために使用されます。タンブリングウィンドウとは異なり、スライディングウィンドウは互いに重複することができます。
構文
GROUP BY 句で HOP 関数を使用して、スライディングウィンドウを定義できます。
HOP(<time-attr>, <slide-interval>,<size-interval>) 入力パラメーター
パラメーター | 説明 | 例 |
time-attr | パラメーターは、ストリーム内の有効な時間属性フィールドである必要があります。このパラメーターは、時間が処理時間かイベント時間かを指定します。詳細については、「時間属性」をご参照ください。 | - |
slide-interval | スライディングウィンドウが移動する間隔。連続するウィンドウ間の時間差を定義します。このパラメーターは、 |
|
size-interval | スライディングウィンドウのサイズまたは期間。各ウィンドウでカバーされる時間範囲を定義します。このパラメーターは、 |
|
slide-interval パラメーターと size-interval パラメーターの値に基づいて、次のシナリオでスライディングウィンドウを使用できます。
slide-interval < size-interval という条件が満たされている場合、ウィンドウは互いに重複し、各要素は複数のウィンドウに割り当てられます。
slide-interval = size-interval という条件が満たされている場合、ウィンドウはタンブリングウィンドウになります。
slide-interval > size-interval という条件が満たされている場合、ウィンドウはスライディングウィンドウになります。これらのウィンドウは互いに重複せず、ギャップによって区切られます。
ほとんどの場合、ほとんどの要素は複数のウィンドウに割り当てられ、ウィンドウは互いに重複します。スライディングウィンドウは、移動平均の計算に適しています。たとえば、10 秒ごとに過去 5 分間のデータの平均を計算する場合、slide-interval を 10 秒に設定し、size-interval を 5 分に設定します。
ウィンドウ識別子関数
ウィンドウ識別子関数は、ウィンドウの開始時刻、終了時刻、または時間属性を指定します。時間属性は、下位レベルのウィンドウを集計するために使用されます。
関数 | 戻り値の型 | 説明 |
| TIMESTAMP | ウィンドウの開始時刻を境界値を含めて返します。たとえば、ウィンドウの時間範囲が |
| TIMESTAMP | ウィンドウの終了時刻を境界値を含めて返します。たとえば、ウィンドウの時間範囲が |
| TIMESTAMP (rowtime-attr) | ウィンドウの終了時刻を境界値を除いて返します。たとえば、ウィンドウの時間範囲が |
| TIMESTAMP (rowtime-attr) | ウィンドウの終了時刻を境界値を除いて返します。たとえば、ウィンドウの時間範囲が |
例
次の例では、1 分ウィンドウが 30 秒ごとに 1 回スライドします。ウィンドウを使用して、30 秒ごとに過去 1 分間のユーザーごとのクリック数をカウントできます。
user_clicks テーブルのテストデータ
username (VARCHAR)
click_url (VARCHAR)
eventtime (VARCHAR)
Jark
http://taobao.com/xxx2024-10-10 10:00:00.0Jark
http://taobao.com/xxx2024-10-10 10:00:10.0Jark
http://taobao.com/xxx2024-10-10 10:00:49.0Jark
http://taobao.com/xxx2024-10-10 10:01:05.0Jark
http://taobao.com/xxx2024-10-10 10:01:58.0Timo
http://taobao.com/xxx2024-10-10 10:02:10.0テストステートメント
CREATE TEMPORARY TABLE user_clicks ( username VARCHAR, click_url VARCHAR, eventtime VARCHAR, ts AS TO_TIMESTAMP(eventtime), WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- 行時間のウォーターマークを定義します。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE hop_output ( window_start TIMESTAMP, window_end TIMESTAMP, username VARCHAR, clicks BIGINT ) WITH ( 'connector'='print', 'logger'='true' ); INSERT INTO hop_output SELECT HOP_START (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), HOP_END (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE), username, COUNT (click_url) FROM user_clicks GROUP BY HOP (ts, INTERVAL '30' SECOND, INTERVAL '1' MINUTE),username;テスト結果
window_start (TIMESTAMP)
window_end (TIMESTAMP)
username (VARCHAR)
clicks (BIGINT)
2024-10-10 09:59:30.0①2024-10-10 10:00:30.0Jark
2
2024-10-10 10:00:00.02024-10-10 10:01:00.0Jark
3
2024-10-10 10:00:30.02024-10-10 10:01:30.0Jark
2
2024-10-10 10:01:00.02024-10-10 10:02:00.0Jark
2
2024-10-10 10:01:30.02024-10-10 10:02:30.0②Jark
1
2024-10-10 10:01:30.02024-10-10 10:02:30.0②Timo
1
説明①スライディングウィンドウがデータがウィンドウに入る時刻を読み取れない場合、最初のウィンドウの開始時刻が前方に移動されます。次の式を使用して、開始時刻が前方に移動される時間間隔を計算できます。時間間隔 = ウィンドウ期間 - スライディングステップ
ウィンドウ期間(秒)
スライディングステップ(秒)
イベント時間
最初のウィンドウの開始時刻
最初のウィンドウの終了時刻
120
30
2024-07-31 10:00:00.02024-07-31 09:58:30.02024-07-31 10:00:30.060
10
2024-07-31 10:00:00.02024-07-31 09:59:10.02024-07-31 10:00:10.0②タイムスタンプ
2024-10-10 10:02:30.0の行は、実際には結果に表示されません。これは、ウィンドウ計算がトリガーされていないためです。ウィンドウ計算をトリガーするには、任意のユーザーからの追加レコードが必要です。ウィンドウ計算のトリガー時間 ≥
window_end + ウォーターマーク(10:02:32.0 など)