The WATERMARK clause is used to process out-of-order data in streaming data queries. This topic describes the WATERMARK syntax and provides an example.

Syntax

SELECT watermark(projectItem, durationSpec) as watermarkItem, projectItem [, projectItem ]*
FROM tableExpression

You can use WATERMARK to solve data delay issues during streaming data processing. When Spark aggregates or joins data, it maintains intermediate state data. The watermark feature allows Spark to discard delayed data. When a data delay occurs, Spark also discards the intermediate state data that has expired from the memory. This feature ensures the stable running of streaming data queries.

For more information about WATERMARK, visit:

Example

The stream_source table must contain a field of the TIMESTAMP type to indicate event time. In the following examples, the maximum data delay is 10 seconds. <input class="dnt" value="Do Not Translate" readonly="readonly" tabindex="-1">
  • Aggregate + Watermark
    SELECT watermark(ts, interval 10 seconds), count(*) as col1, col2
    FROM stream_source
    GROUP BY ts, col2
  • Join + Watermark
    SELECT
       watermark(stream_source_1.ts, interval 20 second) as ts1,
       watermark(stream_source_2.ts, interval 10 second) as ts2,
       stream_source_1.col1 as col1,
       stream_source_2.col2 as col2
     FROM
     stream_source_1
     INNER JOIN
     stream_source_2
     ON stream_source_1.col1=stream_source_2.col1
     AND ts1 >= ts2
     AND ts1 <= ts2 + interval 10 seconds