EMIT 文は、Realtime Compute V2.0 以降でサポートされています。

ポリシー

ユーザーは、さまざまなシナリオのクエリに対して、さまざまな出力ポリシー (許容される最大遅延など) を必要とする場合があります。 たとえば、1 時間のウィンドウが終了するまで 1 分ごとに最新の結果を見たいが、ウィンドウが終了してから 1 日以内に遅れて到着するデータを失いたくないとします。 従来の ANSI SQL では、この要件を満たす構文は提供されていません。 Flink SQL は、このような要件から EMIT 構文を抽象化し、SQL 構文を EMIT 構文で拡張します。

目的

現在、EMIT 構文は遅延の制御とデータ精度の向上に使用されています。
  1. 遅延の制御。 ウィンドウが大きい場合、ウィンドウが終了するまでの結果の出力頻度を設定して、ユーザーへの結果出力の遅延を短縮できます。
  2. データ精度の向上。 システムは、ウィンドウが終わった後に到着したデータを破棄せず、データを出力結果に反映します。
EMIT ポリシーを設定するときは、オーバーヘッドも考慮する必要があります。 出力遅延が低く、データ精度が高いほど、計算のオーバーヘッドが高くなります。

構文

EMIT 構文は、出力ポリシーの定義、つまり INSERT INTO 文での操作の定義に使用されます。 EMIT ポリシーが設定されていない場合、デフォルトの動作が有効になります。 この場合、ウィンドウは、ウィンドウが終了したとき、つまりウォーターマークがトリガーされたときにのみ結果を生成します。 構文は次のとおりです。
INSERTINTO tableName
query
EMIT strategy [, strategy]*

strategy ::= {WITH DELAY timeInterval | WITHOUT DELAY} 
                [BEFORE WATERMARK | AFTER WATERMARK]

timeInterval ::= 'string' timeUnit
			
  • WITH DELAY:許容される最大結果出力遅延を指定します。 指定した間隔で結果が生成されます。
  • WITHOUT DELAY:遅延を許容しないよう指定します。 データが受信されるとすぐに結果が生成されます。
  • BEFORE WATERMARK:ウィンドウが終了する前、つまりウォーターマークがトリガーされる前のポリシーを指定します。
  • AFTER WATERMARK:ウィンドウが終了した後、つまりウォーターマークがトリガーされた後のポリシーを指定します。
注意:
  1. 複数の戦略を定義でき、BEFORE WATERMARK および AFTER WATERMARK ポリシーを同時に定義することができます。 ただし、2 つの BEFORE WATERMARK ポリシー、または 2 つの AFTER WATERMARK ポリシーを同時に定義することはできません。
  2. AFTER WATERMARK ポリシーが定義されている場合は、blink.state.ttl.ms パラメーターを設定して、許容できる最大遅延を明示的に定義する必要があります。

-- Before a window ends, results are generated at a delay of 1 minute. After the window ends, results are generated without delay.
EMIT 
  WITH DELAY '1'MINUTE BEFORE WATERMARK,
  WITHOUT DELAY AFTER WATERMARK

-- Before a window ends, no result is generated. After the window ends, results are generated without delay.
EMIT WITHOUT DELAY AFTER WATERMARK

-- Results are generated at a delay of 1 minute globally. (The delay is used by MiniBatch to accumulate data.)
EMIT WITH DELAY '1'MINUTE -- Before a window ends, results are generated at a delay of 1 minute.
EMIT WITH DELAY '1'MINUTE BEFORE WATERMARK
			
  • 最大許容遅延

    AFTER WATERMARK ポリシーが設定されている場合、ウィンドウのステータスは一定期間保持され、遅延データを待ちます。 では、どれだけの間、遅延データを待つのでしょうか。 blink.state.ttl.ms パラメーターを設定して、ウィンドウの状態保存時間 (TTL) をカスタマイズすることができます。 このパラメーターは、デフォルトでは設定されていません。 AFTER WATERMARK ポリシーが設定されている場合、明示的に blink.state.ttl.ms パラメーターを設定する必要があります。 たとえば、blink.state.ttl.ms=3600000 は、ウィンドウが遅れたデータを 1 時間まで待つことを示します。 1 時間以上遅れて到着したデータは直接破棄されます。
  • 1 時間のタンブリングウィンドウ tumble_window を例に取ります。
    CREATE VIEW tumble_window AS
    SELECT 
      id,
      TUMBLE_START(rowtime, INTERVAL '1' HOUR) as start_time,
      COUNT(*) as cnt
    FROM source
    GROUP BY id, TUMBLE(rowtime, INTERVAL '1' HOUR)
    					
    デフォルトでは、tumble_window の結果を取得する前に 1 時間待つ必要があります。 結果が不完全であっても、できるだけ早く結果を取得したい場合があります。 たとえば、ウィンドウから 1 分ごとに最新の結果を見たいとします。 この場合、次のように EMIT ポリシーを定義します。
    INSERT INTO result
    SELECT * FROM tumble_window
    EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK -- Before the window ends, results are generated every 1 minute.
    					
    デフォルトでは、tumble_window は、ウィンドウの終了後に到着したすべてのデータを破棄します。 遅れて届いたデータは重要なものであるかもしれません。また、最終的な結果にそのデータを組み込みたいとします。 データは 1 日以上遅れて到着することはなく、遅延データが受信された直後に結果を更新したいとします。 この場合、次のように EMIT ポリシーを定義できます。
    INSERT INTO result
    SELECT * FROM tumble_window
    EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK, 
         WITHOUT DELAY AFTER WATERMARK  -- After the window ends, results are generated immediately data is received.
    
    -- Add a state TTL configuration of one day.
    blink.state.ttl.ms = 86400000
    					

遅延の概念

ここでは、遅延とは、ユーザーデータが Realtime Compute に入るときに始まり、結果データが Realtime Compute から出るときに終わる期間を指します。 遅延はユーザーにとって許容範囲であり、イベント時間または処理時間のいずれかです。 遅延はシステム時間に基づいて計算されます。 つまり、遅延は、動的テーブルで行が変更されてから、結果のテーブルで新しい行を表示できるようになるまでの時間です。 動的テーブルは Realtime Compute 内部のデータストリームストレージ媒体であり、結果テーブルは Realtime Compute 外部のストレージ媒体です。

Realtime Compute の処理時間が 0 の場合、MiniBatch がデータを蓄積し、ウィンドウがウィンドウデータを待つするときに遅延が発生します。 ユーザーが 30 秒の最大遅延を許容できると指定した場合、MiniBatch はこの 30 秒を使用してデータを蓄積できます。 1 時間のウィンドウのクエリの場合、最大 30 秒の遅延は、変更された行が 30 秒ごとにエクスポートされることを意味します。 遅延は、ウィンドウ結果のトリガー間隔を指定します。

EMIT WITH DELAY '1' MINUTE を例に取ります。 一般的な GROUP BY 集計では、MiniBatch がデータを蓄積するのには 1 分かかります。 サイズが 1 分を超えるウィンドウの場合、ウィンドウは 1 分ごとに結果を生成します。 ウィンドウサイズが 1 分以下の場合、設定は無視されます。 これは、ウォーターマークを使用することで、ウィンドウが遅延 SLA を保証できるためです。

別の例は EMIT WITHOUT DELAY です。 一般的な GROUP BY 集計の場合、MiniBatch は有効ではなく、データは受信された直後に計算され、エクスポートされます。 ウィンドウの場合、データも計算され、受信されるとすぐにエクスポートされます。

現状と今後の予定

現状と今後の予定:
  1. 現在、タンブリングウィンドウとスライディングウィンドウのみが EMIT ポリシーをサポートしています。 セッションウィンドウは EMIT ポリシーをサポートしていません。
  2. 現在はジョブに複数の出力がある場合、これらの出力に対して同じ EMIT ポリシーを定義する必要があります。 今後、異なるポリシーの定義がサポートされる予定です。
  3. 現在は、EMIT 構文を使用して、MiniBatch に allowLateness を設定することはできません。 今後、EMIT ポリシーで allowLateness を宣言できるようにする予定です。