Flink SQL supports regular joins (also called dual-stream joins) over dynamic tables. Unlike batch joins, results are continuously updated as new data arrives on either stream—making the final output consistent with what a batch join would produce over the same data.
Background
Regular joins buffer all historical records from both input streams in Flink state so that any incoming row can be matched against all previously seen rows on the other side. This means state grows indefinitely as long as the job is running. For long-running jobs on high-volume streams, unbounded state growth can exhaust memory and cause failures.
To limit state size, set a state time-to-live (TTL) using the table.exec.state.ttl parameter or the per-stream JOIN_STATE_TTL hint (see Hints). Note that evicting state may affect result correctness—rows that have aged out will no longer match new arrivals.
If unbounded state is a concern, also consider interval joins, lookup joins, and window joins as alternatives.
Syntax
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'tableReference: the table name.tableExpression: the table expression.joinCondition: the join condition.
List the table with the lower update frequency first and the table with the higher update frequency last. This reduces the amount of state that needs to be matched per incoming row.
Join types
INNER JOIN
Returns only rows where the join condition is satisfied on both sides.
SELECT id, productName, status
FROM orders o
JOIN shipments s
ON o.id = s.orderId;LEFT JOIN
Returns all rows from the left table. Rows with no match on the right side produce NULL values for the right-side columns.
SELECT o.id, o.productName, s.status
FROM orders o
LEFT JOIN shipments s
ON o.id = s.orderId;RIGHT JOIN
Returns all rows from the right table. Rows with no match on the left side produce NULL values for the left-side columns.
SELECT o.id, o.productName, s.status
FROM orders o
RIGHT JOIN shipments s
ON o.id = s.orderId;FULL OUTER JOIN
Returns all rows from both tables. Rows with no match on either side produce NULL values for the unmatched columns.
SELECT o.id, o.productName, s.status
FROM orders o
FULL OUTER JOIN shipments s
ON o.id = s.orderId;CROSS JOIN
Returns the Cartesian product of both tables—every row on the left is paired with every row on the right.
SELECT *
FROM orders o
CROSS JOIN shipments s;Hints
In Ververica Runtime (VVR) 8.0.1 and later, use hints to specify different time-to-live (TTL) values for the left and right stream states. This reduces the amount of state data that needs to be maintained.
The JOIN_STATE_TTL hint applies only to regular joins. It does not support lookup, interval, or window joins. Hints for regular joins are experimental. The syntax may change in future versions.
Syntax
-- VVR 8.0.1 and later
SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
-- VVR 8.0.7 and later also support the Apache Flink syntax
SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...Set tableReference to a table name, a view name, or an alias. If you specify an alias for a table, use the alias in the hint.
If you specify a TTL for only one stream, the other stream uses the deployment-level TTL from the table.exec.state.ttl parameter. The default value is 1.5 days. For details, see Basic configurations.
Examples
All examples use JOIN_STATE_TTL to set independent TTLs for each stream. VVR 8.0.7 and later also accept the equivalent STATE_TTL syntax shown in the comments.
Using an alias
SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */
o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
FROM Orders AS o
JOIN Products AS p
ON o.productid = p.productid;
-- VVR 8.0.7 and later
SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */
o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
FROM Orders AS o
JOIN Products AS p
ON o.productid = p.productid;Using a table name
SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
FROM Orders
JOIN Products
ON Orders.productid = Products.productid;
-- VVR 8.0.7 and later
SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
FROM Orders
JOIN Products
ON Orders.productid = Products.productid;Using a view name
CREATE TEMPORARY VIEW v AS
SELECT id, ...
FROM (
SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn
FROM src1
WHERE ...
) tmp
WHERE rn = 1;
SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
FROM v
LEFT JOIN src2 AS b ON v.id = b.id;
-- VVR 8.0.7 and later
SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
FROM v
LEFT JOIN src2 AS b ON v.id = b.id;Examples
Example 1: Join the Orders and Shipments tables
Test data
Table 1. Orders
| id | productName | ordertime |
|---|---|---|
| 1 | phone_a | 2025-05-01 10:00:00.0 |
| 2 | notebook_x | 2025-05-01 10:02:00.0 |
| 3 | phone_b | 2025-05-01 10:03:00.0 |
| 4 | pad_m | 2025-05-01 10:05:00.0 |
Table 2. Shipments
| shipId | orderId | status | shipTime |
|---|---|---|---|
| 101 | 1 | shipped | 2025-05-01 11:00:00.0 |
| 102 | 2 | delivered | 2025-05-01 17:00:00.0 |
| 103 | 3 | shipped | 2025-05-01 12:00:00.0 |
| 104 | 4 | shipped | 2025-05-01 11:30:00.0 |
Test statement
SELECT id, productName, status
FROM orders o
JOIN shipments s
ON o.id = s.orderId;Test results
| id | productName | status |
|---|---|---|
| 1 | phone_a | shipped |
| 2 | notebook_x | delivered |
| 3 | phone_b | shipped |
| 4 | pad_m | shipped |
Example 2: Join the datahub_stream1 and datahub_stream2 tables
Test data
Table 3. datahub_stream1
| a (BIGINT) | b (BIGINT) | c (VARCHAR) |
|---|---|---|
| 0 | 10 | test11 |
| 1 | 10 | test21 |
Table 4. datahub_stream2
| a (BIGINT) | b (BIGINT) | c (VARCHAR) |
|---|---|---|
| 0 | 10 | test11 |
| 1 | 10 | test21 |
| 0 | 10 | test31 |
| 1 | 10 | test41 |
Test statement
SELECT s1.c, s2.c
FROM datahub_stream1 AS s1
JOIN datahub_stream2 AS s2
ON s1.a = s2.a
WHERE s1.a = 0;Test results
| s1.c (VARCHAR) | s2.c (VARCHAR) |
|---|---|
| test11 | test11 |
| test11 | test31 |