このトピックでは、インターバル結合の使用方法について説明します。
背景情報
インターバル結合を使用すると、2 つのデータストリームを共通キーで結合できます。 指定された相対時間間隔内にタイムスタンプが含まれる場合、各ストリームの要素が接続されます。 2 つのデータストリームが結合された後、入力データストリームのタイムスタンプ列は保持されます。これにより、さらにイベント時間処理を行うことができます。
構文
SELECT column-names
FROM table1 [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN table2
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSIONINNER JOIN、LEFT JOIN、RIGHT JOIN、および FULL JOIN がサポートされています。JOIN を使用すると、デフォルトで INNER JOIN が実行されます。
SEMI JOIN および ANTI JOIN はサポートされていません。
TIMEBOUND_EXPRESSION は、2 つのデータストリームの時間属性の列に対するインターバル条件式です。 次の条件式がサポートされています。
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
例
この例では、注文後 4 時間以内に発送された商品を示します。
テストデータ
注文テーブル (orders)
id
productName
orderTime
1
phone
2024-04-01 10:00:00.0
2
laptop
2024-04-01 10:02:00.0
3
watch
2024-04-01 10:03:00.0
4
tablet
2024-04-01 10:05:00.0
物流テーブル (Shipments)
shipId
orderId
status
shiptime
0
1
shipped
2024-04-01 11:00:00.0
1
2
delivered
2024-04-01 17:00:00.0
2
3
shipped
2024-04-01 12:00:00.0
3
4
shipped
2024-04-01 11:30:00.0
テストステートメント
CREATE TEMPORARY TABLE Orders( id BIGINT, productName VARCHAR, ordertime TIMESTAMP(3), WATERMARK wk FOR ordertime as withOffset(ordertime, 2000) -- ordertime 列をテーブルのイベント時間属性として定義し、2 秒遅延のウォーターマーク戦略を使用します。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourBrokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE Shipments( shipId BIGINT, orderId BIGINT, status VARCHAR, shiptime TIMESTAMP(3), WATERMARK wk FOR shiptime as withOffset(shiptime, 2000) -- ordertime 列をテーブルのイベント時間属性として定義し、2 秒遅延のウォーターマーク戦略を使用します。 ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<yourBrokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); -- MySQL シンクを作成します。 CREATE TEMPORARY TABLE rds_output( id BIGINT, productName VARCHAR, status VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<yourHostname>', 'port' = '3306', 'username' = '<yourUsername>', 'password' = '<yourPassword>', 'database-name' = '<yourDatabaseName>', 'table-name' = '<yourTableName>' ); INSERT INTO rds_output SELECT id, productName, status FROM Orders AS o JOIN Shipments AS s on o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;テスト結果
id(bigint)
productName(varchar)
status(varchar)
1
phone
shipped
3
watch
shipped
4
tablet
shipped