All Products
Search
Document Center

Realtime Compute for Apache Flink:Interval join

Last Updated:Mar 26, 2026

The interval join allows two data streams to be joined on a common key. It connects elements from each stream when their timestamps fall within a specified relative time interval. After the two data streams are joined, timestamp columns in the input data streams are retained, which allows for further event-time processing.

Syntax

SELECT column-names
FROM table1 [AS <alias1>]
[INNER | LEFT | RIGHT | FULL] JOIN table2
ON table1.column-name1 = table2.key-name1 AND TIMEBOUND_EXPRESSION

Supported join types: INNER JOIN (default when JOIN is used alone), LEFT JOIN, RIGHT JOIN, and FULL JOIN. SEMI JOIN and ANTI JOIN are not supported.

TIMEBOUND_EXPRESSION must bind the timestamp of one stream to a closed interval relative to the timestamp of the other. The following expression forms are supported:

Form Example
Equality ltime = rtime
Range ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
BETWEEN ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

Example: match orders shipped within 4 hours

This example joins an orders stream with a shipments stream to find orders shipped within 4 hours of placement.

Test data

Orders table:

id productName orderTime
1 phone 2024-04-01 10:00:00.0
2 laptop 2024-04-01 10:02:00.0
3 watch 2024-04-01 10:03:00.0
4 tablet 2024-04-01 10:05:00.0

Shipments table:

shipId orderId status shiptime
0 1 shipped 2024-04-01 11:00:00.0
1 2 delivered 2024-04-01 17:00:00.0
2 3 shipped 2024-04-01 12:00:00.0
3 4 shipped 2024-04-01 11:30:00.0

SQL statements

Both source tables use a Kafka connector with a 2-second delayed watermark on the event-time column. The join condition o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime matches orders where the order time falls within 4 hours before the ship time.

CREATE TEMPORARY TABLE Orders(
  id BIGINT,
  productName VARCHAR,
  ordertime TIMESTAMP(3),
  WATERMARK wk FOR ordertime as withOffset(ordertime, 2000)  -- 2-second delayed watermark on ordertime
) WITH (
  'connector' = 'kafka',
  'topic' = '<yourTopic>',
  'properties.bootstrap.servers' = '<yourBrokers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

CREATE TEMPORARY TABLE Shipments(
  shipId BIGINT,
  orderId BIGINT,
  status VARCHAR,
  shiptime TIMESTAMP(3),
  WATERMARK wk FOR shiptime as withOffset(shiptime, 2000)  -- 2-second delayed watermark on shiptime
) WITH (
  'connector' = 'kafka',
  'topic' = '<yourTopic>',
  'properties.bootstrap.servers' = '<yourBrokers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
);

-- MySQL sink
CREATE TEMPORARY TABLE rds_output(
  id BIGINT,
  productName VARCHAR,
  status VARCHAR
) WITH (
  'connector' = 'mysql',
  'hostname' = '<yourHostname>',
  'port' = '3306',
  'username' = '<yourUsername>',
  'password' = '<yourPassword>',
  'database-name' = '<yourDatabaseName>',
  'table-name' = '<yourTableName>'
);

INSERT INTO rds_output
SELECT id, productName, status
FROM Orders AS o
JOIN Shipments AS s ON o.id = s.orderId AND
     o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;

Replace the following placeholders with your actual values:

Placeholder Description
<yourTopic> Kafka topic name
<yourBrokers> Kafka bootstrap server addresses
<yourHostname> MySQL hostname
<yourUsername> MySQL username
<yourPassword> MySQL password
<yourDatabaseName> Target database name
<yourTableName> Target table name

Result

id (BIGINT) productName (VARCHAR) status (VARCHAR)
1 phone shipped
3 watch shipped
4 tablet shipped

Order 2 (laptop) is excluded because its ship time (17:00) is more than 4 hours after the order time (10:02).