全部產品
Search
文件中心

Realtime Compute for Apache Flink:雙流JOIN語句

更新時間:Jun 07, 2025

Flink SQL支援對動態表進行複雜而靈活的串連操作,本文為您介紹如何使用雙流JOIN語句。

背景資訊

Realtime Compute的JOIN和傳統批處理JOIN的語義一致,都用於將兩張表關聯起來。區別為Realtime Compute關聯的是兩張動態表,關聯的結果也會動態更新,以保證最終結果和批處理結果一致。

雙流JOIN文法

tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression

joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
  • tableReference:表名稱。

  • tableExpression:運算式。

  • joinCondition:JOIN條件。

雙流JOIN hints

從Realtime Compute引擎VVR 8.0.1 開始,您可以通過提示(Hints)單獨為雙流JOIN的左右流狀態設定不同生命週期 (TTL)來減少維護的狀態大小。

  • 文法

    -- VVR 8.0.1 開始
    SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
    
    -- VVR 8.0.7 開始,您也可以使用社區的Join State TTL Hint文法
    SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
  • 注意事項

    • JOIN STATE TTL HINT僅支援在雙流JOIN情境使用,不支援維表JOIN、Interval Join或Window Join。

    • 若雙流JOIN時JOIN STATE TTL HINT僅指定某一條流的在JOIN節點的狀態生命週期,則另外一條流的狀態生命週期使用Flink SQL作業層級的狀態生命週期,由table.exec.state.ttl控制(參見基本配置),預設值為1.5天。

    • tableReference支援表名,視圖名和別名,一旦為表名指定別名時,則需使用別名。

    • 這是一個實驗性質的特性,HINT文法未來可能會發生變化。

  • 樣本

    -- HINT使用別名
    SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */
      o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
    -- VVR 8.0.7及以上版本也可以使用新文法
    SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */
      o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
    
    -- HINT使用表名
    SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
      FROM Orders
      JOIN Products
    ON Orders.productid = Products.productid;
    -- VVR 8.0.7及以上版本也可以使用新文法
    SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
      FROM Orders
      JOIN Products
    ON Orders.productid = Products.productid;
    
    -- HINT使用視圖名
    CREATE TEMPORARY VIEW v AS
    SELECT id, ...
    	FROM (
    		SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn
    		FROM src1
    		WHERE ...
    	) tmp
    WHERE rn = 1;
    	
    SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
    FROM v
    LEFT JOIN src2 AS b ON v.id = b.id;
    -- VVR 8.0.7及以上版本也可以使用新文法
    SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
    FROM v
    LEFT JOIN src2 AS b ON v.id = b.id;

Orders JOIN Shipments表的資料樣本

  • 測試資料

    表 1. Orders

    id

    productName

    ordertime

    1

    phone_a

    2025-05-01 10:00:00.0

    2

    notebook_x

    2025-05-01 10:02:00.0

    3

    phone_b

    2025-05-01 10:03:00.0

    4

    pad_m

    2025-05-01 10:05:00.0

    表 2. Shipments

    shipId

    orderId

    status

    shipTime

    101

    1

    shipped

    2025-05-01 11:00:00.0

    102

    2

    delivered

    2025-05-01 17:00:00.0

    103

    3

    shipped

    2025-05-01 12:00:00.0

    104

    4

    shipped

    2025-05-01 11:30:00.0

  • 測試語句

    SELECT id, productName, status
    FROM orders o 
    JOIN shipments s 
      ON o.id = s.orderId;
  • 測試結果

    id

    productName

    status

    1

    phone_a

    shipped

    2

    notebook_x

    delivered

    3

    phone_b

    shipped

    4

    pad_m

    shipped

datahub_stream1 JOIN datahub_stream2表的資料樣本

  • 測試資料

    表 3. datahub_stream1

    a(BIGINT)

    b(BIGINT)

    c(VARCHAR)

    0

    10

    test11

    1

    10

    test21

    表 4. datahub_stream2

    a(BIGINT)

    b(BIGINT)

    c(VARCHAR)

    0

    10

    test11

    1

    10

    test21

    0

    10

    test31

    1

    10

    test41

  • 測試語句

    SELECT s1.c,s2.c 
    FROM datahub_stream1 AS s1
    JOIN datahub_stream2 AS s2 
    ON s1.a = s2.a
    WHERE s1.a = 0;    
  • 測試結果

    s1.c(VARCHAR)

    s2.c(VARCHAR)

    test11

    test11

    test11

    test31