このトピックでは、Flink SQL がサポートする時間属性の Event Time と Processing Time について説明します。
- Event Time:テーブルスキーマで指定するイベント時刻です。通常は、データの元の作成時間です。
- Processing Time:システムがイベントを処理するローカルシステム時刻です。
前の図に示すように、Ingestion Time と Processing Time は、データレコードに対してシステムによって自動的に生成される時間属性です。 これらを制御することはできません。 Event Time はデータレコードに付属する時間属性です。 データの順序の乱れ、ネットワークのジッタ、またはその他の理由により、Event Time が t1 (パーティション 1 に対応) のデータレコードは、イベント時間が t2 (パーティション 2 に対応) の別のデータレコードよりも後で Flink によって処理される場合があります (t2 は t1 よりも後)。
Event Time
Event Time は rowtime とも呼ばれます。 Event Time 属性は、ソーステーブル DDL で宣言する必要があります。 ソーステーブルの特定のフィールドを Event Time (rowtime) フィールドとして宣言することができます。 現在、TIMESTAMP 型のフィールドのみ rowtime フィールドとして宣言できます。 LONG 型は今後サポートされる予定です。 利用可能な TIMESTAMP 列がない場合は、計算列を使用して既存の列に基づいて TIMESTAMP 列を作成する必要があります。
データの順序の乱れ、ネットワークジッタ、またはその他の理由により、データレコードが受信される順序は、処理される順序とは異なる場合があります。 このため、rowtime フィールドを定義するには、明示的に透かしの計算法を定義する必要があります。
CREATE TABLE tt_stream(
a VARCHAR,
b VARCHAR,
c TIMESTAMP,
WATERMARK wk1 FOR c as withOffset(c, 1000) --The watermark computation method.
) WITH (
type = 'sls',
topic = 'yourTopicName',
accessId = 'yourAccessId',
accessKey = 'yourAccessSecret'
);
CREATE TABLE rds_output(
id VARCHAR,
c TIMESTAMP,
f TIMESTAMP,
cnt BIGINT
) WITH (
type = 'rds',
url = 'jdbc:mysql://****3306/test',
tableName = 'yourTableName',
userName = 'yourUserName',
password = 'yourPassword'
);
INSERT INTO rds_output
SELECT a AS id,
SESSION_START(c, INTERVAL '1' SECOND) AS c,
CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f,
COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(c, INTERVAL '1' SECOND), a
Processing Time
filedName as PROCTIME()
CREATE TABLE mq_stream (
a VARCHAR,
b VARCHAR,
c BIGINT,
d AS PROCTIME() -- Explicitly define a Processing Time column in the declaration of the source table.) WITH (
type = 'mq',
topic = 'yourTopic',
accessId = 'yourAccessId',
accessKey = 'yourAccessSecret'
);
CREATE TABLE rds_output (
id VARCHAR,
c TIMESTAMP,
f TIMESTAMP,
cnt BIGINT) with (
type = 'rds',
url = 'yourDatebaseURL',
tableName = 'yourDatabasTableName',
userName = 'yourUserName',
password = 'yourPassword'
);
INSERT INTO rds_output
SELECT a AS id,
SESSION_START(d, INTERVAL '1' SECOND) AS c,
SESSION_END(d, INTERVAL '1' SECOND) AS f,
COUNT(a) AS cnt
FROM mq_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a