すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:処理時間テンポラル結合ステートメント

最終更新日:Jan 08, 2025

Realtime Compute for Apache Flink では、各データストリームを外部データソースのディメンションテーブルに関連付けることができます。これにより、Realtime Compute for Apache Flink で関連クエリを実行できます。このトピックでは、処理時間テンポラル結合の使用方法について説明します。

背景

処理時間テンポラル結合は、処理時間属性を使用して、ファクトテーブルの行をディメンションテーブルの対応するキーの最新バージョンに関連付けます。イベント発生時の時刻に基づいて行を関連付けるイベント時間テンポラル結合とは異なり、処理時間テンポラル結合はデータ到着時刻に基づいて行を関連付けます。

前提条件

  • Realtime Compute for Apache Flink は、Ververica Runtime (VVR) 8.0.10 以降を使用します。

  • MySQL ディメンションテーブルが作成されています。

使用上の注意

  • Realtime Compute for Apache Flink はフル同期中のチェックポイントをサポートしていないため、execution.checkpointing.interval-during-backlog = 0 を設定して、フル同期中のチェックポイントを無効にします。この設定は、増分データ同期中のチェックポイントには影響しません。

  • 処理時間テンポラル結合を使用するには、table.optimizer.proctime-temporal-join-strategy = TEMPORAL_JOIN を設定します。

構文

処理時間テンポラル結合の構文は、ディメンションテーブルの結合と同じです。

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;

  • テストデータ

    • テーブル 1 kafka_input

      id (bigint)

      name (varchar)

      age (bigint)

      1

      Lee

      22

      2

      Harry

      20

      3

      Liban

      28

    • テーブル 2 phoneNumber

      name (varchar)

      phoneNumber (bigint)

      David

      1390000111

      Brooks

      1390000222

      Liban

      1390000333

      Lee

      1390000444

  • テストコード

    SET 'table.optimizer.proctime-temporal-join-strategy' = 'TEMPORAL_JOIN';  -- 処理時間テンポラル結合を使用します。
    SET 'execution.checkpointing.interval-during-backlog' = '0';              -- フル同期中のチェックポイントを無効にします。
    
    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT,
      proc_time AS PROCTIME()
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    -- ディメンションテーブルを定義します。
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF t.proc_time as w
    ON t.name = w.name;
  • テスト結果

    id (bigint)

    phoneNumber (bigint)

    name (varchar)

    1

    1390000444

    Lee

    3

    1390000333

    Liban