Flink SQL支援對動態表進行複雜而靈活的串連操作,本文為您介紹如何使用雙流JOIN語句。
背景資訊
Realtime Compute的JOIN和傳統批處理JOIN的語義一致,都用於將兩張表關聯起來。區別為Realtime Compute關聯的是兩張動態表,關聯的結果也會動態更新,以保證最終結果和批處理結果一致。
雙流JOIN文法
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'tableReference:表名稱。
tableExpression:運算式。
joinCondition:JOIN條件。
雙流JOIN hints
從Realtime Compute引擎VVR 8.0.1 開始,您可以通過提示(Hints)單獨為雙流JOIN的左右流狀態設定不同生命週期 (TTL)來減少維護的狀態大小。
文法
-- VVR 8.0.1 開始 SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ... -- VVR 8.0.7 開始,您也可以使用社區的Join State TTL Hint文法 SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...注意事項
JOIN STATE TTL HINT僅支援在雙流JOIN情境使用,不支援維表JOIN、Interval Join或Window Join。
若雙流JOIN時JOIN STATE TTL HINT僅指定某一條流的在JOIN節點的狀態生命週期,則另外一條流的狀態生命週期使用Flink SQL作業層級的狀態生命週期,由table.exec.state.ttl控制(參見基本配置),預設值為1.5天。
tableReference支援表名,視圖名和別名,一旦為表名指定別名時,則需使用別名。
這是一個實驗性質的特性,HINT文法未來可能會發生變化。
樣本
-- HINT使用別名 SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- VVR 8.0.7及以上版本也可以使用新文法 SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- HINT使用表名 SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- VVR 8.0.7及以上版本也可以使用新文法 SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- HINT使用視圖名 CREATE TEMPORARY VIEW v AS SELECT id, ... FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn FROM src1 WHERE ... ) tmp WHERE rn = 1; SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id; -- VVR 8.0.7及以上版本也可以使用新文法 SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id;
Orders JOIN Shipments表的資料樣本
測試資料
表 1. Orders
id
productName
ordertime
1
phone_a
2025-05-01 10:00:00.0
2
notebook_x
2025-05-01 10:02:00.0
3
phone_b
2025-05-01 10:03:00.0
4
pad_m
2025-05-01 10:05:00.0
表 2. Shipments
shipId
orderId
status
shipTime
101
1
shipped
2025-05-01 11:00:00.0
102
2
delivered
2025-05-01 17:00:00.0
103
3
shipped
2025-05-01 12:00:00.0
104
4
shipped
2025-05-01 11:30:00.0
測試語句
SELECT id, productName, status FROM orders o JOIN shipments s ON o.id = s.orderId;測試結果
id
productName
status
1
phone_a
shipped
2
notebook_x
delivered
3
phone_b
shipped
4
pad_m
shipped
datahub_stream1 JOIN datahub_stream2表的資料樣本
測試資料
表 3. datahub_stream1
a(BIGINT)
b(BIGINT)
c(VARCHAR)
0
10
test11
1
10
test21
表 4. datahub_stream2
a(BIGINT)
b(BIGINT)
c(VARCHAR)
0
10
test11
1
10
test21
0
10
test31
1
10
test41
測試語句
SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;測試結果
s1.c(VARCHAR)
s2.c(VARCHAR)
test11
test11
test11
test31