Realtime Compute for Apache Flink aggregates data by using window functions based on the time attribute. To use window functions based on the event time of a job, you must define a watermark when you declare a source table.
A watermark is used to measure the progress of Event time. It is a hidden data attribute. A watermark is defined in the DDL statement of the
source table. Flink provides the following statement to define a watermark:
WATERMARK [watermarkName] FOR <rowtime_field> AS withOffset(<rowtime_field>, offset)
Note For more information about the time attributes of Realtime Compute for Apache Flink,
see Time attributes.
Parameter | Required | Description |
---|---|---|
watermarkName | No | The name of the watermark. |
<rowtime_field> | Yes | The column used to generate the watermark. <rowtime_field> is identified as the event time column and must be a column of the TIMESTAMP type defined in the table. You can use <rowtime_field> to define a window in the job code. |
withOffset | Yes | The policy to generate a watermark. In this example, the watermark value is generated
by using the following formula: <rowtime_field> - offset . The first parameter in withOffset must be <rowtime_field>.
|
offset | Yes | The offset between the watermark value and event time, in milliseconds. |
A specific field in a data record indicates the time when the record was generated.
For example, a table contains a
rowtime
field whose data type is TIMESTAMP, and one of its values is 1501750584000 (2017-08-03 08:56:24.000)
. If you want to define a watermark based on the rowtime
field and configure a 4-second offset, add the following definition:WATERMARK FOR rowtime AS withOffset(rowtime, 4000)
In this example, the watermark time of the data record is 1501750584000 - 4000 = 1501750580000 (2017-08-03 08:56:20.000)
. This means that all data whose timestamp is earlier than 1501750580000 (2017-08-03 08:56:20.000)
has arrived.
Note
- If you use an event time-based watermark, the
rowtime
field must be of the TIMESTAMP type. Realtime Compute for Apache Flink supports 13-digit UNIX timestamps in milliseconds. If the rowtime field is of another type or the UNIX timestamp is not 13 digits in length, we recommend that you use a computed column to convert the time. For more information, see Computed column. - The event time and processing time can only be declared in the source table. For more information, see Event time and Processing time.
Summary
- A watermark indicates that all the events whose timestamp t' is earlier than the
watermark time t (
t'< t
) have occurred. After the watermark timet
takes effect, all subsequently received data records whose event time is earlier thant
are discarded. Realtime Compute for Apache Flink will allow you to change the configuration and update the subsequent data. - Watermarks are important for data streams that arrive out of order because the watermarks help ensure that the computing in a window is correct even if some events arrive late.
- If an operator has multiple input data streams for parallel processing, the event
time of the data stream with the shortest time is used as the event time of the operator.