This topic describes the time attributes event time and processing time supported by Flink SQL.

Flink supports three time attributes for processing streaming data: processing time, event time, and ingestion time. Flink SQL only supports event time and processing time.
  • Event time: the event time that you provide in the data store, which is typically the original data creation time.
  • Processing time: the local system time when the system processes an event, in milliseconds.

Event time

Event time is also known as rowtime. The event time attribute must be declared in the DDL statement of the source table. You can declare a field in the source table as the event time. Currently, you can only declare a field of the TIMESTAMP type. The LONG type will be supported in the future. If the source table does not contain a TIMESTAMP column, you can use a computed column to build a TIMESTAMP column based on an existing column. For more information, see Computed column.

Because of data being out-of-order or network jitters caused by congestion, the order in which data records are received may be different from the order in which they are processed. Therefore, after you define a rowtime field, you must define a watermark computation method. For more information, see Watermark.

The following example shows data aggregation by using event time-based window functions:
CREATE TABLE tt_stream (
  a VARCHAR,
  b VARCHAR,
  ts TIMESTAMP,
  WATERMARK wk1 FOR ts as withOffset (ts, 1000) -- Define a watermark computation method.
) WITH (
  type = 'sls',
  topic = '<yourTopicName>',
  accessId = '<yourAccessId>',
  accessKey = '<yourAccessSecret>'
);
CREATE TABLE rds_output (
  id VARCHAR,
  win_start TIMESTAMP,
  win_end TIMESTAMP,
  cnt BIGINT
) WITH (
  type = 'rds',
  url = 'jdbc:mysql://****3306/test',
  tableName = '<yourTableName>',
  userName = '<yourUserName>',
  password = '<yourPassword>'
);
INSERT
  INTO rds_output
SELECT
  a AS id,
  SESSION_START (ts, INTERVAL '1' SECOND) AS win_start,
  SESSION_END (ts, INTERVAL '1' SECOND) AS win_end,
  COUNT (a) AS cnt
FROM
  tt_stream
GROUP
  BY SESSION (ts, INTERVAL '1' SECOND),
  a

Processing time

Processing time is generated by the system and is not included in the raw data. Therefore, you must explicitly define a processing time column when you declare the source table.
filedName as PROCTIME()
The following example shows data aggregation by using processing time-based window functions:
CREATE TABLE mq_stream (
    a VARCHAR,
    b VARCHAR,
    c BIGINT,
    ts AS PROCTIME () -- Explicitly define a processing time column when you declare the source table.
  ) WITH (
    type = 'mq',
    topic = '<yourTopic>',
    accessId = '<yourAccessId>',
    accessKey = '<yourAccessSecret>'
  );
CREATE TABLE rds_output (
  id VARCHAR,
  win_start TIMESTAMP,
  win_end TIMESTAMP,
  cnt BIGINT
) with (
  type = 'rds',
  url = '<yourDatebaseURL>',
  tableName = '<yourDatabasTableName>',
  userName = '<yourUserName>',
  password = '<yourPassword>'
);
INSERT
  INTO rds_output
SELECT
  a AS id,
  SESSION_START (ts, INTERVAL '1' SECOND) AS win_start,
  SESSION_END (ts, INTERVAL '1' SECOND) AS win_end,
  COUNT (a) AS cnt
FROM
  mq_stream
GROUP
  BY SESSION (ts, INTERVAL '1' SECOND),
  a       

Expiration of time attributes

A time attribute field loses its time attribute after the following operations:
  • GROUP BY operations on fields other than the time attribute, except for the GROUP BY operations in a tumbling, sliding, and session window. For more information, see Tumbling window, Sliding window, and Session window.
  • JOIN operations on two data streams.
    Note For more information, see JOIN.
  • MATCH_RECOGNIZE operations in a CEP statement.
  • PARTITION BY operations in an OVER window.
  • UNION operations (UNION = RETRACT + UNION ALL).
If you perform time-based window function operations after the preceding operations, the operations encounter errors, such as org.apache.flink.table.api.ValidationException: Window can only be defined over a time attribute column.