V2.0
以降でサポートされています。
ポリシー
ユーザーは、さまざまなシナリオのクエリに対して、さまざまな出力ポリシー (許容される最大遅延など) を必要とする場合があります。 たとえば、1 時間のウィンドウが終了するまで 1 分ごとに最新の結果を見たいが、ウィンドウが終了してから 1 日以内に遅れて到着するデータを失いたくないとします。 従来の ANSI SQL では、この要件を満たす構文は提供されていません。 Flink SQL は、このような要件から EMIT 構文を抽象化し、SQL 構文を EMIT 構文で拡張します。
目的
- 遅延の制御。 ウィンドウが大きい場合、ウィンドウが終了するまでの結果の出力頻度を設定して、ユーザーへの結果出力の遅延を短縮できます。
- データ精度の向上。 システムは、ウィンドウが終わった後に到着したデータを破棄せず、データを出力結果に反映します。
構文
EMIT 構文は、出力ポリシーの定義、つまり INSERT INTO 文での操作の定義に使用されます。 EMIT ポリシーが設定されていない場合、デフォルトの動作が有効になります。 この場合、ウィンドウは、ウィンドウが終了したとき、つまりウォーターマークがトリガーされたときにのみ結果を生成します。 構文は次のとおりです。INSERTINTO tableName
query
EMIT strategy [, strategy]*
strategy ::= {WITH DELAY timeInterval | WITHOUT DELAY}
[BEFORE WATERMARK | AFTER WATERMARK]
timeInterval ::= 'string' timeUnit
WITH DELAY
:許容される最大結果出力遅延を指定します。 指定した間隔で結果が生成されます。WITHOUT DELAY
:遅延を許容しないよう指定します。 データが受信されるとすぐに結果が生成されます。BEFORE WATERMARK
:ウィンドウが終了する前、つまりウォーターマークがトリガーされる前のポリシーを指定します。AFTER WATERMARK
:ウィンドウが終了した後、つまりウォーターマークがトリガーされた後のポリシーを指定します。
- 複数の戦略を定義でき、BEFORE WATERMARK および AFTER WATERMARK ポリシーを同時に定義することができます。 ただし、2 つの BEFORE WATERMARK ポリシー、または 2 つの AFTER WATERMARK ポリシーを同時に定義することはできません。
- AFTER WATERMARK ポリシーが定義されている場合は、
blink.state.ttl.ms
パラメーターを設定して、許容できる最大遅延を明示的に定義する必要があります。
例
-- Before a window ends, results are generated at a delay of 1 minute. After the window ends, results are generated without delay.
EMIT
WITH DELAY '1'MINUTE BEFORE WATERMARK,
WITHOUT DELAY AFTER WATERMARK
-- Before a window ends, no result is generated. After the window ends, results are generated without delay.
EMIT WITHOUT DELAY AFTER WATERMARK
-- Results are generated at a delay of 1 minute globally. (The delay is used by MiniBatch to accumulate data.)
EMIT WITH DELAY '1'MINUTE -- Before a window ends, results are generated at a delay of 1 minute.
EMIT WITH DELAY '1'MINUTE BEFORE WATERMARK
-
最大許容遅延
AFTER WATERMARK ポリシーが設定されている場合、ウィンドウのステータスは一定期間保持され、遅延データを待ちます。 では、どれだけの間、遅延データを待つのでしょうか。blink.state.ttl.ms
パラメーターを設定して、ウィンドウの状態保存時間 (TTL) をカスタマイズすることができます。 このパラメーターは、デフォルトでは設定されていません。 AFTER WATERMARK ポリシーが設定されている場合、明示的にblink.state.ttl.ms
パラメーターを設定する必要があります。 たとえば、blink.state.ttl.ms=3600000
は、ウィンドウが遅れたデータを 1 時間まで待つことを示します。 1 時間以上遅れて到着したデータは直接破棄されます。 -
例
1 時間のタンブリングウィンドウtumble_window
を例に取ります。
デフォルトでは、CREATE VIEW tumble_window AS SELECT id, TUMBLE_START(rowtime, INTERVAL '1' HOUR) as start_time, COUNT(*) as cnt FROM source GROUP BY id, TUMBLE(rowtime, INTERVAL '1' HOUR)
tumble_window
の結果を取得する前に 1 時間待つ必要があります。 結果が不完全であっても、できるだけ早く結果を取得したい場合があります。 たとえば、ウィンドウから 1 分ごとに最新の結果を見たいとします。 この場合、次のように EMIT ポリシーを定義します。
デフォルトでは、INSERT INTO result SELECT * FROM tumble_window EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK -- Before the window ends, results are generated every 1 minute.
tumble_window
は、ウィンドウの終了後に到着したすべてのデータを破棄します。 遅れて届いたデータは重要なものであるかもしれません。また、最終的な結果にそのデータを組み込みたいとします。 データは 1 日以上遅れて到着することはなく、遅延データが受信された直後に結果を更新したいとします。 この場合、次のように EMIT ポリシーを定義できます。INSERT INTO result SELECT * FROM tumble_window EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK, WITHOUT DELAY AFTER WATERMARK -- After the window ends, results are generated immediately data is received. -- Add a state TTL configuration of one day. blink.state.ttl.ms = 86400000
遅延の概念
ここでは、遅延とは、ユーザーデータが Realtime Compute に入るときに始まり、結果データが Realtime Compute から出るときに終わる期間を指します。 遅延はユーザーにとって許容範囲であり、イベント時間または処理時間のいずれかです。 遅延はシステム時間に基づいて計算されます。 つまり、遅延は、動的テーブルで行が変更されてから、結果のテーブルで新しい行を表示できるようになるまでの時間です。 動的テーブルは Realtime Compute 内部のデータストリームストレージ媒体であり、結果テーブルは Realtime Compute 外部のストレージ媒体です。
Realtime Compute の処理時間が 0 の場合、MiniBatch がデータを蓄積し、ウィンドウがウィンドウデータを待つするときに遅延が発生します。 ユーザーが 30 秒の最大遅延を許容できると指定した場合、MiniBatch はこの 30 秒を使用してデータを蓄積できます。 1 時間のウィンドウのクエリの場合、最大 30 秒の遅延は、変更された行が 30 秒ごとにエクスポートされることを意味します。 遅延は、ウィンドウ結果のトリガー間隔を指定します。
EMIT WITH DELAY '1' MINUTE
を例に取ります。 一般的な GROUP BY 集計では、MiniBatch がデータを蓄積するのには 1 分かかります。 サイズが 1 分を超えるウィンドウの場合、ウィンドウは
1 分ごとに結果を生成します。 ウィンドウサイズが 1 分以下の場合、設定は無視されます。 これは、ウォーターマークを使用することで、ウィンドウが遅延 SLA を保証できるためです。
別の例は EMIT WITHOUT DELAY
です。 一般的な GROUP BY 集計の場合、MiniBatch は有効ではなく、データは受信された直後に計算され、エクスポートされます。 ウィンドウの場合、データも計算され、受信されるとすぐにエクスポートされます。
現状と今後の予定
現状と今後の予定:- 現在、タンブリングウィンドウとスライディングウィンドウのみが EMIT ポリシーをサポートしています。 セッションウィンドウは EMIT ポリシーをサポートしていません。
- 現在はジョブに複数の出力がある場合、これらの出力に対して同じ EMIT ポリシーを定義する必要があります。 今後、異なるポリシーの定義がサポートされる予定です。
- 現在は、EMIT 構文を使用して、MiniBatch に allowLateness を設定することはできません。 今後、EMIT ポリシーで allowLateness を宣言できるようにする予定です。