すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:インターバル結合

最終更新日:Jan 09, 2025

このトピックでは、インターバル結合の使用方法について説明します。

背景情報

インターバル結合を使用すると、2 つのデータストリームを共通キーで結合できます。 指定された相対時間間隔内にタイムスタンプが含まれる場合、各ストリームの要素が接続されます。 2 つのデータストリームが結合された後、入力データストリームのタイムスタンプ列は保持されます。これにより、さらにイベント時間処理を行うことができます。

構文

SELECT column-names
FROM table1  [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN table2 
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION
説明
  • INNER JOIN、LEFT JOIN、RIGHT JOIN、および FULL JOIN がサポートされています。JOIN を使用すると、デフォルトで INNER JOIN が実行されます。

  • SEMI JOIN および ANTI JOIN はサポートされていません。

  • TIMEBOUND_EXPRESSION は、2 つのデータストリームの時間属性の列に対するインターバル条件式です。 次の条件式がサポートされています。

    • ltime = rtime

    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

この例では、注文後 4 時間以内に発送された商品を示します。

  • テストデータ

    • 注文テーブル (orders)

      id

      productName

      orderTime

      1

      phone

      2024-04-01 10:00:00.0

      2

      laptop

      2024-04-01 10:02:00.0

      3

      watch

      2024-04-01 10:03:00.0

      4

      tablet

      2024-04-01 10:05:00.0

    • 物流テーブル (Shipments)

      shipId

      orderId

      status

      shiptime

      0

      1

      shipped

      2024-04-01 11:00:00.0

      1

      2

      delivered

      2024-04-01 17:00:00.0

      2

      3

      shipped

      2024-04-01 12:00:00.0

      3

      4

      shipped

      2024-04-01 11:30:00.0

  • テストステートメント

    CREATE TEMPORARY TABLE Orders(
      id BIGINT,
      productName VARCHAR,
      ordertime TIMESTAMP(3),
      WATERMARK wk FOR ordertime as withOffset(ordertime, 2000)  -- ordertime 列をテーブルのイベント時間属性として定義し、2 秒遅延のウォーターマーク戦略を使用します。
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourBrokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE Shipments(
      shipId BIGINT,
      orderId BIGINT,
      status VARCHAR,
      shiptime TIMESTAMP(3),
      WATERMARK wk FOR shiptime as withOffset(shiptime, 2000)  -- ordertime 列をテーブルのイベント時間属性として定義し、2 秒遅延のウォーターマーク戦略を使用します。
    ) WITH (
      'connector' = 'kafka',
       'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourBrokers>',
      'scan.startup.mode' = 'earliest-offset',
      'format' = 'csv'
    );
    
    -- MySQL シンクを作成します。
    CREATE TEMPORARY TABLE rds_output(
      id BIGINT,
      productName VARCHAR,
      status VARCHAR
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    INSERT INTO rds_output
    SELECT id, productName, status
    FROM Orders AS o
    JOIN Shipments AS s on o.id = s.orderId AND
         o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
  • テスト結果

    id(bigint)

    productName(varchar)

    status(varchar)

    1

    phone

    shipped

    3

    watch

    shipped

    4

    tablet

    shipped