This topic describes the following time attributes that are supported by Blink SQL: event time and processing time.
- Event time: the event time that you provide in the data store. In most cases, the event time is the original time when the data is created.
- Processing time: the local system time when the system processes an event. The unit is milliseconds.
Event Time
The event time is also known as rowtime. The event time attribute must be declared in the data definition language (DDL) statement that you execute to create a source table. You can declare a field in the source table as the rowtime field. Note that you can declare a field of only the TIMESTAMP type as the rowtime field. In the future, you can declare a field of the LONG type as the rowtime field. If the source table does not contain a TIMESTAMP column, you can use a computed column to create a TIMESTAMP column based on an existing column. For more information, see Computed column.
In some scenarios, the order in which data records are received may be different from the order in which they are processed. The possible causes include out-of-order input data and network jitters. The network jitters may be caused by network congestions and transmission latencies. Before you define a rowtime field, define a computing method for watermarks in an explicit way. For more information, see Watermark.
CREATE TABLE tt_stream (
a VARCHAR,
b VARCHAR,
ts TIMESTAMP,
WATERMARK wk1 FOR ts as withOffset (ts, 1000) --Define a computing method for watermarks.
) WITH (
type = 'sls',
topic = '<yourTopicName>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessSecret>'
);
CREATE TABLE rds_output (
id VARCHAR,
win_start TIMESTAMP,
win_end TIMESTAMP,
cnt BIGINT
) WITH (
type = 'rds',
url = 'jdbc:mysql://****3306/test',
tableName = '<yourTableName>',
userName = '<yourUserName>',
password = '<yourPassword>'
);
INSERT
INTO rds_output
SELECT
a AS id,
SESSION_START (ts, INTERVAL '1' SECOND) AS win_start,
SESSION_END (ts, INTERVAL '1' SECOND) AS win_end,
COUNT (a) AS cnt
FROM
tt_stream
GROUP
BY SESSION (ts, INTERVAL '1' SECOND),
a
Processing Time
filedName as PROCTIME()
CREATE TABLE mq_stream (
a VARCHAR,
b VARCHAR,
c BIGINT,
ts AS PROCTIME () --Explicitly define a processing time column when you declare the source table.
) WITH (
type = 'mq',
topic = '<yourTopic>',
accessId = '<yourAccessId>',
accessKey = '<yourAccessSecret>'
);
CREATE TABLE rds_output (
id VARCHAR,
win_start TIMESTAMP,
win_end TIMESTAMP,
cnt BIGINT
) with (
type = 'rds',
url = '<yourDatebaseURL>',
tableName = '<yourDatabasTableName>',
userName = '<yourUserName>',
password = '<yourPassword>'
);
INSERT
INTO rds_output
SELECT
a AS id,
SESSION_START (ts, INTERVAL '1' SECOND) AS win_start,
SESSION_END (ts, INTERVAL '1' SECOND) AS win_end,
COUNT (a) AS cnt
FROM
mq_stream
GROUP
BY SESSION (ts, INTERVAL '1' SECOND),
a
Expiration of time attributes
- GROUP BY operations on the fields that are not defined as time attribute fields, except the GROUP BY operations in tumbling, sliding, and session windows. For more information, see TUMBLE, HOP, and SESSION.
- JOIN operations on two data streams.
Note For more information, see JOIN statements.
- MATCH_RECOGNIZE operations in complex event processing (CEP) statements. For more information, see CEP statements.
- PARTITION BY operations in OVER windows. For more information, see OVER windows.
- UNION operations. UNION is equivalent to the combination of RETRACT and UNION ALL.
org.apache.flink.table.api.ValidationException: Window can only be defined over a
time attribute column.