All Products
Search
Document Center

Realtime Compute for Apache Flink:Regular join

Last Updated:Mar 26, 2026

Flink SQL supports regular joins (also called dual-stream joins) over dynamic tables. Unlike batch joins, results are continuously updated as new data arrives on either stream—making the final output consistent with what a batch join would produce over the same data.

Background

Regular joins buffer all historical records from both input streams in Flink state so that any incoming row can be matched against all previously seen rows on the other side. This means state grows indefinitely as long as the job is running. For long-running jobs on high-volume streams, unbounded state growth can exhaust memory and cause failures.

To limit state size, set a state time-to-live (TTL) using the table.exec.state.ttl parameter or the per-stream JOIN_STATE_TTL hint (see Hints). Note that evicting state may affect result correctness—rows that have aged out will no longer match new arrivals.

If unbounded state is a concern, also consider interval joins, lookup joins, and window joins as alternatives.

Syntax

tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression

joinCondition:
    ON booleanExpression
  | USING '(' column [, column ]* ')'
  • tableReference: the table name.

  • tableExpression: the table expression.

  • joinCondition: the join condition.

List the table with the lower update frequency first and the table with the higher update frequency last. This reduces the amount of state that needs to be matched per incoming row.

Join types

INNER JOIN

Returns only rows where the join condition is satisfied on both sides.

SELECT id, productName, status
FROM orders o
JOIN shipments s
  ON o.id = s.orderId;

LEFT JOIN

Returns all rows from the left table. Rows with no match on the right side produce NULL values for the right-side columns.

SELECT o.id, o.productName, s.status
FROM orders o
LEFT JOIN shipments s
  ON o.id = s.orderId;

RIGHT JOIN

Returns all rows from the right table. Rows with no match on the left side produce NULL values for the left-side columns.

SELECT o.id, o.productName, s.status
FROM orders o
RIGHT JOIN shipments s
  ON o.id = s.orderId;

FULL OUTER JOIN

Returns all rows from both tables. Rows with no match on either side produce NULL values for the unmatched columns.

SELECT o.id, o.productName, s.status
FROM orders o
FULL OUTER JOIN shipments s
  ON o.id = s.orderId;

CROSS JOIN

Returns the Cartesian product of both tables—every row on the left is paired with every row on the right.

SELECT *
FROM orders o
CROSS JOIN shipments s;

Hints

In Ververica Runtime (VVR) 8.0.1 and later, use hints to specify different time-to-live (TTL) values for the left and right stream states. This reduces the amount of state data that needs to be maintained.

Important

The JOIN_STATE_TTL hint applies only to regular joins. It does not support lookup, interval, or window joins. Hints for regular joins are experimental. The syntax may change in future versions.

Syntax

-- VVR 8.0.1 and later
SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...

-- VVR 8.0.7 and later also support the Apache Flink syntax
SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...

Set tableReference to a table name, a view name, or an alias. If you specify an alias for a table, use the alias in the hint.

If you specify a TTL for only one stream, the other stream uses the deployment-level TTL from the table.exec.state.ttl parameter. The default value is 1.5 days. For details, see Basic configurations.

Examples

All examples use JOIN_STATE_TTL to set independent TTLs for each stream. VVR 8.0.7 and later also accept the equivalent STATE_TTL syntax shown in the comments.

Using an alias

SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */
  o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
FROM Orders AS o
JOIN Products AS p
  ON o.productid = p.productid;

-- VVR 8.0.7 and later
SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */
  o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
FROM Orders AS o
JOIN Products AS p
  ON o.productid = p.productid;

Using a table name

SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
FROM Orders
JOIN Products
  ON Orders.productid = Products.productid;

-- VVR 8.0.7 and later
SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
FROM Orders
JOIN Products
  ON Orders.productid = Products.productid;

Using a view name

CREATE TEMPORARY VIEW v AS
SELECT id, ...
FROM (
  SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn
  FROM src1
  WHERE ...
) tmp
WHERE rn = 1;

SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
FROM v
LEFT JOIN src2 AS b ON v.id = b.id;

-- VVR 8.0.7 and later
SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
FROM v
LEFT JOIN src2 AS b ON v.id = b.id;

Examples

Example 1: Join the Orders and Shipments tables

Test data

Table 1. Orders

idproductNameordertime
1phone_a2025-05-01 10:00:00.0
2notebook_x2025-05-01 10:02:00.0
3phone_b2025-05-01 10:03:00.0
4pad_m2025-05-01 10:05:00.0

Table 2. Shipments

shipIdorderIdstatusshipTime
1011shipped2025-05-01 11:00:00.0
1022delivered2025-05-01 17:00:00.0
1033shipped2025-05-01 12:00:00.0
1044shipped2025-05-01 11:30:00.0

Test statement

SELECT id, productName, status
FROM orders o
JOIN shipments s
  ON o.id = s.orderId;

Test results

idproductNamestatus
1phone_ashipped
2notebook_xdelivered
3phone_bshipped
4pad_mshipped

Example 2: Join the datahub_stream1 and datahub_stream2 tables

Test data

Table 3. datahub_stream1

a (BIGINT)b (BIGINT)c (VARCHAR)
010test11
110test21

Table 4. datahub_stream2

a (BIGINT)b (BIGINT)c (VARCHAR)
010test11
110test21
010test31
110test41

Test statement

SELECT s1.c, s2.c
FROM datahub_stream1 AS s1
JOIN datahub_stream2 AS s2
  ON s1.a = s2.a
WHERE s1.a = 0;

Test results

s1.c (VARCHAR)s2.c (VARCHAR)
test11test11
test11test31