The interval join allows two data streams to be joined on a common key. It connects elements from each stream when their timestamps fall within a specified relative time interval. After the two data streams are joined, timestamp columns in the input data streams are retained, which allows for further event-time processing.
Syntax
SELECT column-names
FROM table1 [AS <alias1>]
[INNER | LEFT | RIGHT | FULL] JOIN table2
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION
Supported join types: INNER JOIN (default when JOIN is used alone), LEFT JOIN, RIGHT JOIN, and FULL JOIN. SEMI JOIN and ANTI JOIN are not supported.
TIMEBOUND_EXPRESSION must bind the timestamp of one stream to a closed interval relative to the timestamp of the other. The following expression forms are supported:
| Form | Example |
|---|---|
| Equality | ltime = rtime |
| Range | ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE |
| BETWEEN | ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND |
Example: match orders shipped within 4 hours
This example joins an orders stream with a shipments stream to find orders shipped within 4 hours of placement.
Test data
Orders table:
| id | productName | orderTime |
|---|---|---|
| 1 | phone | 2024-04-01 10:00:00.0 |
| 2 | laptop | 2024-04-01 10:02:00.0 |
| 3 | watch | 2024-04-01 10:03:00.0 |
| 4 | tablet | 2024-04-01 10:05:00.0 |
Shipments table:
| shipId | orderId | status | shiptime |
|---|---|---|---|
| 0 | 1 | shipped | 2024-04-01 11:00:00.0 |
| 1 | 2 | delivered | 2024-04-01 17:00:00.0 |
| 2 | 3 | shipped | 2024-04-01 12:00:00.0 |
| 3 | 4 | shipped | 2024-04-01 11:30:00.0 |
SQL statements
Both source tables use a Kafka connector with a 2-second delayed watermark on the event-time column. The join condition o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime matches orders where the order time falls within 4 hours before the ship time.
CREATE TEMPORARY TABLE Orders(
id BIGINT,
productName VARCHAR,
ordertime TIMESTAMP(3),
WATERMARK wk FOR ordertime as withOffset(ordertime, 2000) -- 2-second delayed watermark on ordertime
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<yourBrokers>',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
CREATE TEMPORARY TABLE Shipments(
shipId BIGINT,
orderId BIGINT,
status VARCHAR,
shiptime TIMESTAMP(3),
WATERMARK wk FOR shiptime as withOffset(shiptime, 2000) -- 2-second delayed watermark on shiptime
) WITH (
'connector' = 'kafka',
'topic' = '<yourTopic>',
'properties.bootstrap.servers' = '<yourBrokers>',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
);
-- MySQL sink
CREATE TEMPORARY TABLE rds_output(
id BIGINT,
productName VARCHAR,
status VARCHAR
) WITH (
'connector' = 'mysql',
'hostname' = '<yourHostname>',
'port' = '3306',
'username' = '<yourUsername>',
'password' = '<yourPassword>',
'database-name' = '<yourDatabaseName>',
'table-name' = '<yourTableName>'
);
INSERT INTO rds_output
SELECT id, productName, status
FROM Orders AS o
JOIN Shipments AS s ON o.id = s.orderId AND
o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;
Replace the following placeholders with your actual values:
| Placeholder | Description |
|---|---|
<yourTopic> |
Kafka topic name |
<yourBrokers> |
Kafka bootstrap server addresses |
<yourHostname> |
MySQL hostname |
<yourUsername> |
MySQL username |
<yourPassword> |
MySQL password |
<yourDatabaseName> |
Target database name |
<yourTableName> |
Target table name |
Result
| id (BIGINT) | productName (VARCHAR) | status (VARCHAR) |
|---|---|---|
| 1 | phone | shipped |
| 3 | watch | shipped |
| 4 | tablet | shipped |
Order 2 (laptop) is excluded because its ship time (17:00) is more than 4 hours after the order time (10:02).