EMIT statements allow you to define different output policies for a query in different scenarios, which controls delay and improve data accuracy.

Note EMIT statements are supported only in Realtime Compute V2.0.0 and later.

EMIT policy

An EMIT policy refers to an output policy such as the maximum delay for a query in a specific scenario in Flink SQL. The traditional ANSI SQL syntax does not support such output policies. Assume that you want to view the latest result every minute before a one-hour window ends and do not want to lose data that arrives within one day after the window ends. In this scenario, Realtime Compute abstracts the EMIT syntax from this scenario and extends SQL statements with the EMIT syntax. The following examples show how to define EMIT policies in different scenarios:
  • Before a window ends, results are generated with a one-minute delay. After the window ends, results are generated without delay.
    EMIT 
      WITH DELAY '1'MINUTE BEFORE WATERMARK,
      WITHOUT DELAY AFTER WATERMARK
  • Before a window ends, no result is generated. After the window ends, results are generated without delay.
    EMIT WITHOUT DELAY AFTER WATERMARK
  • Results are generated with a one-minute delay before and after the window ends. You can set the minibatch parameter to increase the delay.
    EMIT WITH DELAY '1'MINUTE
  • Before the window ends, results are generated with a one-minute delay.
    EMIT WITH DELAY '1'MINUTE BEFORE WATERMARK 

Usage

The EMIT statements offer the following features:
  • Control delay: You can set the result output frequency before the window ends to shorten the delay in displaying results.
  • Improve data accuracy: The system does not discard data that arrives late, which corrects the output.
Note When you select an EMIT policy, you must balance between business complexity and resource consumption. A lower output delay and a higher data accuracy consume a higher computing overhead.

Syntax

The EMIT syntax is used to define output policies, which means that it defines an action in the INSERT INTO statement. If the EMIT syntax is not configured, the default action takes effect. In this case, a window generates a result only when the window ends. A watermark is triggered when the window ends.
Note An EMIT statement can only be placed after all QUERY statements in the INSERT INTO statement and cannot be placed in a VIEW statement.
INSERT INTO tableName
<Query>
EMIT strategy [, strategy]*

strategy ::= {WITH DELAY timeInterval | WITHOUT DELAY} 
                [BEFORE WATERMARK |AFTER WATERMARK]

timeInterval ::='string' timeUnit            
Parameter Description
WITH DELAY Specifies the maximum output delay. Results are generated at the specified interval.
WITHOUT DELAY Specifies that no delay is allowed. A result is generated immediately after data is received.
BEFORE WATERMARK Specifies the policy before a window ends. A watermark is triggered when a window ends.
AFTER WATERMARK Specifies the policy after a window ends. A watermark is triggered when a window ends.
Note
  • You can set strategy as follows:
    • Set it to one BEFORE.
    • Set it to one AFTER.
    • Set it to one BEFORE and one AFTER.
    Note You cannot define two BEFORE policies or two AFTER policies at the same time.
  • If you configure the AFTER WATERMARK policy, you must declare the blink.state.ttl.ms parameter in plaintext mode to indicate the maximum delay.

TTL

When the AFTER WATERMARK policy is configured, the window status is retained for a period of time to wait for late data. The retention period is called time to live (TTL). After the AFTER policy is applied, you can declare the blink.state.ttl.ms parameter in plaintext mode to set TTL for the window state. For example, blink.state.ttl.ms=3600000 indicates that the window will wait for late data for as long as 1 hour. Data that arrives more than 1 hour late will be discarded.

Examples

Take a one-hour tumbling window as an example.
CREATE VIEW tumble_window AS
SELECT 
  `id`,
  TUMBLE_START(rowtime, INTERVAL '1' HOUR) as start_time,
  COUNT(*) as cnt
FROM source
GROUP BY `id`, TUMBLE(rowtime, INTERVAL '1' HOUR)                    
By default, you need to wait for 1 hour before obtaining the result of tumble_window. If you want to view the latest result (even though the result is incomplete) of the window every minute, you can execute the following statements:
INSERT INTO result
SELECT * FROM tumble_window
EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK -- Before the window ends, the updated result is generated at one minute intervals.                    
By default, tumble_window ignores and discards the data that arrives after the window ends. If you want the output result to include the data that arrives one day after the window ends and and update the result immediately after a data record is received, you can execute the following statements:
INSERT INTO result
SELECT * FROM tumble_window
EMIT WITH DELAY '1' MINUTE BEFORE WATERMARK, 
     WITHOUT DELAY AFTER WATERMARK  -- After the window ends, updated results are generated immediately after a data record is received.

blink.state.ttl.ms = 86400000    -- Configure a one-day TTL to retain data that arrives one day after the window ends.

Delay

In an EMIT policy, delay refers to the duration that starts when user data enters Realtime Compute and ends when result data is generated from Realtime Compute. The delay is tolerable to users and can be either in event time or processing time. A delay is calculated based on the system time. The delay is the interval between the time data changes in a dynamic table and the time a new record is displayed in a result table. A dynamic table is the storage media of streaming data in Realtime Compute. A result table is the storage media that is referenced by Realtime Compute.

If the processing time of Realtime Compute is 0, a delay may occur when streaming data accumulates and when a window waits for data. If you specify a maximum delay of 30 seconds, the 30 seconds can be used to accumulate streaming data. If a one-hour window is specified in a query, a maximum delay of 30 seconds indicates that the output results are updated every 30 seconds.

  • Take the configuration EMIT WITH DELAY '1' MINUTE as an example.

    When you use the GROUP BY clause to aggregate data, the system accumulates streaming data within one minute. For a window whose size is greater than one minute, the window generates a result every one minute. If the size of the window is less than one minute, the system ignores this configuration because the window can ensure the delay SLA by using the watermark.

  • Take the configuration EMIT WITHOUT DELAY as an example.

    When you use the GROUP BY clause to aggregate data, the system does not use the minibatch parameter to increase the delay. Data is computed and exported immediately after it is received. When you use window functions, data computing and output are also triggered immediately after a data record is received.

Limits

  • Only tumbling and sliding windows support EMIT policies. Session windows do not support EMIT policies.
  • If a job has multiple outputs, you must define the same EMIT policy for the outputs. Different EMIT policies will be supported in the future.
  • The EMIT syntax cannot be used to set the allowLateness parameter for minibatch. You will be able to declare allowLateness in EMIT policies in the future.