このトピックでは、Flink SQL がサポートする時間属性の Event Time と Processing Time について説明します。

Flink SQL は次の 2 つの時間属性をサポートしています。
  • Event Time:テーブルスキーマで指定するイベント時刻です。通常は、データの元の作成時間です。
  • Processing Time:システムがイベントを処理するローカルシステム時刻です。
下図に、Realtime Compute のワークフローにおけるさまざまな時間属性の位置を示します。

前の図に示すように、Ingestion Time と Processing Time は、データレコードに対してシステムによって自動的に生成される時間属性です。 これらを制御することはできません。 Event Time はデータレコードに付属する時間属性です。 データの順序の乱れ、ネットワークのジッタ、またはその他の理由により、Event Time が t1 (パーティション 1 に対応) のデータレコードは、イベント時間が t2 (パーティション 2 に対応) の別のデータレコードよりも後で Flink によって処理される場合があります (t2 は t1 よりも後)。

Event Time

Event Time は rowtime とも呼ばれます。 Event Time 属性は、ソーステーブル DDL で宣言する必要があります。 ソーステーブルの特定のフィールドを Event Time (rowtime) フィールドとして宣言することができます。 現在、TIMESTAMP 型のフィールドのみ rowtime フィールドとして宣言できます。 LONG 型は今後サポートされる予定です。 利用可能な TIMESTAMP 列がない場合は、計算列を使用して既存の列に基づいて TIMESTAMP 列を作成する必要があります。

データの順序の乱れ、ネットワークジッタ、またはその他の理由により、データレコードが受信される順序は、処理される順序とは異なる場合があります。 このため、rowtime フィールドを定義するには、明示的に透かしの計算法を定義する必要があります。

以下に、Event Time ベースのウィンドウ集約の例を示します。
CREATE TABLE tt_stream(
  a VARCHAR,
  b VARCHAR,
  c TIMESTAMP,
  WATERMARK wk1 FOR c as withOffset(c, 1000)  --The watermark computation method.
) WITH (
  type = 'sls',
  topic = 'yourTopicName',
  accessId = 'yourAccessId',
  accessKey = 'yourAccessSecret'
);

CREATE TABLE rds_output(
  id VARCHAR,
  c TIMESTAMP, 
  f TIMESTAMP,
  cnt BIGINT
) WITH (
  type = 'rds',
  url = 'jdbc:mysql://****3306/test',
  tableName = 'yourTableName',
  userName = 'yourUserName',
  password = 'yourPassword'
);

INSERT INTO rds_output
SELECT a AS id, 
     SESSION_START(c, INTERVAL '1' SECOND) AS c, 
     CAST(SESSION_END(c, INTERVAL '1' SECOND) AS TIMESTAMP) AS f, 
     COUNT(a) AS cnt
FROM tt_stream
GROUP BY SESSION(c, INTERVAL '1' SECOND), a

Processing Time

Processing Time はシステムによって生成され、生データには含まれません。 したがって、ソーステーブルの宣言で、Processing Time 列を明示的に定義する必要があります。
filedName as PROCTIME()
以下に、Processing Time ベースのウィンドウ集約の例を示します。
CREATE TABLE mq_stream (
  a VARCHAR,
  b VARCHAR,
  c BIGINT,
  d AS PROCTIME()   -- Explicitly define a Processing Time column in the declaration of the source table.) WITH (
  type = 'mq',
  topic = 'yourTopic',
  accessId = 'yourAccessId',
  accessKey = 'yourAccessSecret'
);

CREATE TABLE rds_output (
  id VARCHAR,
  c TIMESTAMP, 
  f TIMESTAMP,
  cnt BIGINT) with (
  type = 'rds',
  url = 'yourDatebaseURL',
  tableName = 'yourDatabasTableName',
  userName = 'yourUserName',
  password = 'yourPassword'
);

INSERT INTO rds_output
SELECT a AS id, 
     SESSION_START(d, INTERVAL '1' SECOND) AS c, 
     SESSION_END(d, INTERVAL '1' SECOND) AS f, 
     COUNT(a) AS cnt
FROM mq_stream
GROUP BY SESSION(d, INTERVAL '1' SECOND), a