Flink SQL supports complex and flexible join operations over dynamic tables. This topic describes how to use regular join statements.
Background information
JOIN statements in real-time computing have the same semantic meanings as those for batch processing. The two types of JOIN statements allow you to join two tables. The difference is that each JOIN statement in real-time computing joins two dynamic tables. The join results are dynamically updated to ensure that the final results are the same as the related results of batch processing.
Syntax
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
tableReference: specifies the table name.
tableExpression: specifies the expression.
joinCondition: specifies the join condition.
Hints for regular joins
In Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.1 or later, you can use the Flink SQL hint syntax to specify different time-to-live (TTL) values for the states of left and right streams that are involved in a regular join. This helps reduce the size of the state data to be maintained.
Syntax
SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
Precautions
The JOIN_STATE_TTL hint can be used only for regular joins. The JOIN_STATE_TTL hint cannot be used for JOIN operations for dimension tables, Interval JOIN, or Window JOIN.
If you use the JOIN_STATE_TTL hint to specify only the TTL of states for one stream on a JOIN operator in a regular join, the TTL of states for the other stream is specified by the table.exec.state.ttl parameter. This parameter specifies the TTL of states for Flink SQL deployments and the default value is 1.5 days. For more information about this parameter, see Basic parameters.
tableReference can be set to a table name, a view name, or an alias. If you specify an alias for a table, you must use the alias.
This is an experimental feature. The syntax of the JOIN_STATE_TTL hint may change in the future.
Sample code
-- Use an alias as a hint. SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- Use a table name as a hint. SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- Use a view name as a hint. CREATE TEMPORARY VIEW v AS SELECT id, ... FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn FROM src1 WHERE ... ) tmp WHERE rn = 1; SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id;
Example 1: Join the Orders table and the Products table
Test data
Table 1 Orders rowtime
productid
orderid
units
10:17:00
30
5
4
10:17:05
10
6
1
10:18:05
20
7
2
10:18:07
30
8
20
11:02:00
10
9
6
11:04:00
10
10
1
11:09:30
40
11
12
11:24:11
10
12
4
Table 2 Products productid
name
unitprice
30
Cheese
17
10
Beer
0.25
20
Wine
6
30
Cheese
17
10
Beer
0.25
10
Beer
0.25
40
Bread
100
10
Beer
0.25
Test statements
SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid;
Test results
o.rowtime
o.productid
o.orderid
o.units
p.name
p.unitprice
10:17:00
30
5
4
Cheese
17.00
10:17:00
30
5
4
Cheese
17.00
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:17:05
10
6
1
Beer
0.25
10:18:05
20
7
2
Wine
6.00
10:18:07
30
8
20
Cheese
17.00
10:18:07
30
8
20
Cheese
17.00
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:02:00
10
9
6
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:04:00
10
10
1
Beer
0.25
11:09:30
40
11
12
Bread
100.00
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
11:24:11
10
12
4
Beer
0.25
Example 2: Join the datahub_stream1 table and the datahub_stream2 table
Test data
Table 3 datahub_stream1 a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
Table 4 datahub_stream2 a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
0
10
test31
1
10
test41
Test statements
SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;
Test results
s1.c (VARCHAR)
s2.c (VARCHAR)
test11
test11
test11
test31