このトピックでは、Flink SQL でサポートされているウィンドウ関数、時間属性、およびウィンドウの種類について説明します。
ウィンドウ関数
Flink SQL は、無限ウィンドウでの集計をサポートしています。 SQL ステートメントでウィンドウを明示的に定義する必要はありません。 Flink SQL は、特定のウィンドウでの集計もサポートしています。たとえば、前の 1 分間に URL をクリックしたユーザー数をカウントする場合、前の 1 分間のユーザーのクリックに関するデータを収集するウィンドウを定義できます。そして、ウィンドウ内のデータを計算して結果を得ることができます。
Flink SQL は、ウィンドウ集約と OVER 集約をサポートしています。 このトピックでは、ウィンドウ集約について説明します。 ウィンドウ集約は、イベント時間と処理時間の次の時間属性に基づいて定義されたウィンドウをサポートします。 TUMBLE、HOP、SESSION ウィンドウ関数は、それぞれの時間属性でサポートされています。
ウィンドウトリガーメカニズムまたはレイテンシが原因で、ウィンドウ集約(TUMBLE、HOP、SESSION)と LAST_VALUE、FIRST_VALUE、または TopN 関数を組み合わせると、不正確な結果が生じる可能性があります。
時間属性
Flink SQL は、2 つの 時間属性(イベント時間と処理時間)をサポートしています。 Flink は、時間属性に基づいてウィンドウ内のデータを集約します。 ウィンドウイング方法は、時間属性によって異なります。
イベント時間:提供されたイベント時間。通常はレコードに埋め込まれたタイムスタンプです。
システムは、データのイベント時間に基づいて生成されたウォーターマークに基づいてウィンドウを閉じます。 ウィンドウは、ウォーターマークの値がウィンドウが閉じる時刻より後の場合にのみ終了します。 出力データは、ウィンドウが終了すると生成されます。 ウィンドウは、ウィンドウを終了させるトリガーとなるデータが Flink に流れ込んだ場合にのみ、出力データを生成します。 単一のサブタスクのウォーターマーク値は増加します。 複数のサブタスクが実行されている場合、または複数のソーステーブルが存在する場合は、最小ウォーターマーク値が使用されます。
重要ソーステーブルに時刻が現在時刻より遅い順序が正しくないデータレコードが存在する場合、またはソーステーブルのサブタスクまたはパーティションにデータが存在しない場合、ウィンドウは終了せず、出力データが異常になる可能性があります。 この問題を回避するには、順序が正しくないデータに基づいてオフセットを指定し、すべてのサブタスクとソーステーブルのすべてのパーティションにデータが存在することを確認する必要があります。 サブタスクまたはソーステーブルのパーティションにデータが存在しない場合、ウォーターマークを進めることができず、ウィンドウをタイムリーに終了できません。 この場合、
table.exec.source.idle-timeout: 10sを デプロイメント ページの 構成 タブの パラメーター セクションにある その他の構成 フィールドに追加して、ウィンドウの終了をトリガーできます。 このパラメーターの詳細については、「構成」をご参照ください。2 つのデータストリームに対する GROUP BY、JOIN 操作、または OVER ウィンドウノードを使用してデータが処理された後、ウォーターマークプロパティは失われ、イベント時間はウィンドウイングに使用できなくなります。
処理時間:システムがイベントを処理するローカルシステム時間。
処理時間は Flink によって生成され、生データには存在しません。したがって、処理時間列を明示的に定義する必要があります。
説明処理時間は、イベントが Flink に到達する速度と、Flink でデータが処理される順序の影響を受けます。したがって、各バックトラックの結果が異なる場合があります。
カスケードウィンドウ
rowtime 列のイベント時間属性は、ウィンドウ操作の完了後に有効ではなくなります。 TUMBLE_ROWTIME、HOP_ROWTIME、SESSION_ROWTIME などのヘルパー関数を使用して、ウィンドウ内の rowtime 列の max(rowtime) を取得できます。 取得した値をタイムウィンドウの rowtime として使用できます。 値は window_end - 1 で、TIMESTAMP データ型です。 TIMESTAMP 値には rowtime 属性があります。 たとえば、ウィンドウの時間範囲が [00:00, 00:15) の場合、00:14:59.999 が返されます。
次の例では、1 時間のタンブリングウィンドウを使用して、1 分のタンブリングウィンドウの集約結果に基づいてデータが集約されます。 これにより、さまざまなウィンドウイング要件を満たすことができます。
CREATE TEMPORARY TABLE user_clicks(
username varchar,
click_url varchar,
eventtime varchar,
ts AS TO_TIMESTAMP(eventtime),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- rowtime のウォーターマークを定義します。
) with (
'connector'='sls',
...
);
CREATE TEMPORARY TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
'connector'='datahub' -- Simple Log Service では VARCHAR 型の DDL ステートメントのみをエクスポートできます。したがって、DataHub を使用してデータを格納します。
...
);
CREATE TEMPORARY VIEW one_minute_window_output AS
SELECT
TUMBLE_ROWTIME(ts, INTERVAL '1' MINUTE) as rowtime, -- TUMBLE_ROWTIME をレベル 2 ウィンドウの集計時間として使用します。
username,
COUNT(click_url) as cnt
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;
BEGIN statement set;
INSERT INTO tumble_output
SELECT
TUMBLE_START(rowtime, INTERVAL '1' HOUR),
TUMBLE_END(rowtime, INTERVAL '1' HOUR),
username,
SUM(cnt)
FROM one_minute_window_output
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), username;
END;中間データ
ウィンドウの中間データは、キー付き状態 と タイマー の 2 つのタイプに分けられます。 これら 2 つのタイプのデータは、異なるストレージメディアに保存できます。 ジョブの特性に基づいて、さまざまなストレージの組み合わせを選択できます。 次の表に、サポートされているストレージの組み合わせを示します。
キー付き状態のストレージ | タイマーのストレージ |
メモリ | |
メモリ | |
メモリ | |
ファイル |
タイマーは、主に期限切れのウィンドウをトリガーするために使用されます。 メモリが十分にある場合は、パフォーマンス向上のためにタイマーをメモリに保存できます。 多数のタイマーが存在する場合、またはメモリリソースが不足している場合は、RocksDBStateBackend を使用してタイマーを RocksDB ファイルに保存できます。