The WATERMARK clause is used to process out-of-order data in streaming data queries. This topic describes the WATERMARK syntax and provides an example.
Syntax
SELECT watermark(projectItem, durationSpec) as watermarkItem, projectItem [, projectItem ]*
FROM tableExpression
You can use WATERMARK to solve data delay issues during streaming data processing. When Spark aggregates or joins data, it maintains intermediate state data. The watermark feature allows Spark to discard delayed data. When a data delay occurs, Spark also discards the intermediate state data that has expired from the memory. This feature ensures the stable running of streaming data queries.
For more information about WATERMARK, visit:
Example
The stream_source table must contain a field of the TIMESTAMP type to indicate event
time. In the following examples, the maximum data delay is 10 seconds. <input class="dnt"
value="Do Not Translate" readonly="readonly" tabindex="-1">
- Aggregate + Watermark
SELECT watermark(ts, interval 10 seconds), count(*) as col1, col2 FROM stream_source GROUP BY ts, col2
- Join + Watermark
SELECT watermark(stream_source_1.ts, interval 20 second) as ts1, watermark(stream_source_2.ts, interval 10 second) as ts2, stream_source_1.col1 as col1, stream_source_2.col2 as col2 FROM stream_source_1 INNER JOIN stream_source_2 ON stream_source_1.col1=stream_source_2.col1 AND ts1 >= ts2 AND ts1 <= ts2 + interval 10 seconds