Note The EMIT statement is supported in Realtime Compute V2.0 and later.

Policy

Users may require different output policies (such as the maximum delay allowed) for a query in different scenarios. For example, a user wants to view the latest result every 1 minute before a 1-hour window ends, and does not want to lose data that arrives late within one day after the window ends. The traditional ANSI SQL does not provide the syntax to meet this requirement. Flink SQL abstracts the EMIT syntax from such requirement and extends the SQL syntax with the EMIT syntax.

Purpose

Currently, the EMIT syntax is used to control delay and improve data accuracy.
  1. Control delay. For a large window, you can set the result output frequency before the window ends to shorten the delay in displaying the result to users.
  2. Improve data accuracy. The system does not discard data that arrives after a window ends and updates the data to the output result.
When configuring EMIT policies, you also need to weigh the overhead. A lower output delay and a higher data accuracy mean a higher computing overhead.

Syntax

The EMIT syntax is used to define output policies, that is, to define actions in the INSERT INTO statement. If no EMIT policy is configured, the default behavior takes effect. In this case, a window generates a result only when the window ends, that is, when the watermark is triggered. The syntax is as follows:
INSERTINTO tableName
query
EMIT strategy [, strategy]*

strategy ::= {WITH DELAY timeInterval | WITHOUT DELAY} 
                [BEFORE WATERMARK |AFTER WATERMARK]

timeInterval ::='string' timeUnit
  • WITH DELAY: specifies the maximum result output delay allowed. Results are generated at the specified interval.
  • WITHOUT DELAY: specifies that no delay is allowed. A result is generated immediately data is received.
  • BEFORE WATERMARK: specifies the policy before the window ends, that is, before the watermark is triggered.
  • AFTER WATERMARK: specifies the policy after the window ends, that is, after the watermark is triggered.
Note:
  1. Multiple strategies can be defined, and the BEFORE WATERMARK and AFTER WATERMARK policies can be defined at the same time. However, you cannot define two BEFORE WATERMARK policies or two AFTER WATERMARK policies at the same time.
  2. If the AFTER WATERMARK policy is defined, you must set the blink.state.ttl.ms parameter to explicitly define the maximum delay allowed.

Example

-- Before a window ends, results are generated at a delay of 1 minute. After the window ends, results are generated without delay.
EMIT 
  WITH DELAY '1'MINUTE BEFORE WATERMARK,
  WITHOUT DELAY AFTER WATERMARK

-- Before a window ends, no result is generated. After the window ends, results are generated without delay.
EMIT WITHOUT DELAY AFTER WATERMARK

-- Results are generated at a delay of 1 minute globally. (The delay is used by MiniBatch to accumulate data.)
EMIT WITH DELAY '1'MINUTE -- Before a window ends, results are generated at a delay of 1 minute.
EMIT WITH DELAY '1'MINUTE BEFORE WATERMARK
  • Maximum delay allowed

    When the AFTER WATERMARK policy is configured, the window status is retained for a period of time to wait for late data. But how long will the window wait for late data? You can set the blink.state.ttl.ms parameter to customize a state time to live (TTL) for the window. This parameter is not set by default. If the AFTER WATERMARK policy is configured, you must explicitly set the blink.state.ttl.ms parameter. For example, blink.state.ttl.ms=3600000 indicates that the window will wait for late data for as long as 1 hour. Data that arrives more than 1 hour late will be directly discarded.
  • Example

    Take the 1-hour tumbling window tumble_window as an example.
    CREATE VIEW tumble_window AS
    SELECT 
      id,
      TUMBLE_START(rowtime, INTERVAL '1' HOUR) as start_time,
      COUNT(*) as cnt
    FROM source
    GROUP BY id, TUMBLE(rowtime, INTERVAL '1' HOUR)
    
    By default, you need to wait for 1 hour before obtaining the result of tumble_window. Sometimes, you may want to obtain the result as early as possible even though the result is incomplete. For example, you want to see the latest result from the window every 1 minute. In this case, define the EMIT policy as follows:
    INSERT INTO result
    SELECT * FROM tumble_window
    EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK -- Before the window ends, results are generated every 1 minute.
    
    By default, tumble_window discards any data that arrives after the window ends. The late data may be important to you, and you want to incorporate the data into the final result. You know that the data will not arrive more than one day late and want to update the result immediately late data is received. In this case, you can define the EMIT policy as follows:
    INSERT INTO result
    SELECT * FROM tumble_window
    EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK, 
         WITHOUT DELAY AFTER WATERMARK  -- After the window ends, results are generated immediately data is received.
    
    -- Add a state TTL configuration of one day.
    blink.state.ttl.ms = 86400000
    

Delay concept

In this topic, the delay refers to the duration that starts when user data enters Realtime Compute and ends when result data exits Realtime Compute. The delay is tolerable to users and can be either in event time or processing time. The delay is calculated based on the system time. In other words, the delay is the interval between the time when a row is changed in a dynamic table and the time when the new row can be viewed in a result table. The dynamic table is the data stream storage media inside Realtime Compute, and the result table is the storage media outside Realtime Compute.

If the processing time of Realtime Compute is 0, the delay is generated when MiniBatch accumulates data and a window waits for window data. If a user specifies that a maximum delay of 30 seconds can be tolerated, the 30 seconds can be used by MiniBatch to accumulate data. For a query of a 1-hour window, a maximum delay of 30 seconds means that changed rows are exported every 30 seconds. The delay specifies the trigger interval of the window result.

Take EMIT WITH DELAY '1' MINUTE as an example. For a common GROUP BY aggregation, MiniBatch takes 1 minute to accumulate data. For a window whose size is greater than 1 minute, the window generates a result every 1 minute. If the window size is smaller than or equal to 1 minute, the configuration is ignored. This is because the window can ensure the delay SLA by using the watermark.

Another example is EMIT WITHOUT DELAY. For a common GROUP BY aggregation, MiniBatch is not enabled, and data is computed and exported immediately it is received. For a window, data is also computed and exported immediately it is received.

Current status and future plans

Current status and future plans:
  1. Currently, only tumbling and sliding windows support EMIT policies. Session windows do not support EMIT policies.
  2. Currently, if a job has multiple outputs, the same EMIT policy must be defined for these outputs. Different policies will be supported in the future.
  3. Currently, the EMIT syntax cannot be used to configure allowLateness for MiniBatch. We plan to enable you to declare allowLateness in EMIT policies in the future.