All Products
Search
Document Center

Realtime Compute for Apache Flink:Regular join statements

Last Updated:Nov 16, 2023

Flink SQL supports complex and flexible join operations over dynamic tables. This topic describes how to use regular join statements.

Background information

JOIN statements in real-time computing have the same semantic meanings as those for batch processing. The two types of JOIN statements allow you to join two tables. The difference is that each JOIN statement in real-time computing joins two dynamic tables. The join results are dynamically updated to ensure that the final results are the same as the related results of batch processing.

Syntax

tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression

joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
  • tableReference: specifies the table name.

  • tableExpression: specifies the expression.

  • joinCondition: specifies the join condition.

Hints for regular joins

In Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 8.0.1 or later, you can use the Flink SQL hint syntax to specify different time-to-live (TTL) values for the states of left and right streams that are involved in a regular join. This helps reduce the size of the state data to be maintained.

  • Syntax

    SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
  • Precautions

    • The JOIN_STATE_TTL hint can be used only for regular joins. The JOIN_STATE_TTL hint cannot be used for JOIN operations for dimension tables, Interval JOIN, or Window JOIN.

    • If you use the JOIN_STATE_TTL hint to specify only the TTL of states for one stream on a JOIN operator in a regular join, the TTL of states for the other stream is specified by the table.exec.state.ttl parameter. This parameter specifies the TTL of states for Flink SQL deployments and the default value is 1.5 days. For more information about this parameter, see Basic parameters.

    • tableReference can be set to a table name, a view name, or an alias. If you specify an alias for a table, you must use the alias.

    • This is an experimental feature. The syntax of the JOIN_STATE_TTL hint may change in the future.

  • Sample code

    -- Use an alias as a hint.
    SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */
      o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
    
    -- Use a table name as a hint.
    SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
      FROM Orders
      JOIN Products
    ON Orders.productid = Products.productid;
    
    -- Use a view name as a hint.
    CREATE TEMPORARY VIEW v AS
    SELECT id, ...
    	FROM (
    		SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn
    		FROM src1
    		WHERE ...
    	) tmp
    WHERE rn = 1;
    	
    SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
    FROM v
    LEFT JOIN src2 AS b ON v.id = b.id;

Example 1: Join the Orders table and the Products table

  • Test data

    Table 1 Orders

    rowtime

    productid

    orderid

    units

    10:17:00

    30

    5

    4

    10:17:05

    10

    6

    1

    10:18:05

    20

    7

    2

    10:18:07

    30

    8

    20

    11:02:00

    10

    9

    6

    11:04:00

    10

    10

    1

    11:09:30

    40

    11

    12

    11:24:11

    10

    12

    4

    Table 2 Products

    productid

    name

    unitprice

    30

    Cheese

    17

    10

    Beer

    0.25

    20

    Wine

    6

    30

    Cheese

    17

    10

    Beer

    0.25

    10

    Beer

    0.25

    40

    Bread

    100

    10

    Beer

    0.25

  • Test statements

    SELECT o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
  • Test results

    o.rowtime

    o.productid

    o.orderid

    o.units

    p.name

    p.unitprice

    10:17:00

    30

    5

    4

    Cheese

    17.00

    10:17:00

    30

    5

    4

    Cheese

    17.00

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:17:05

    10

    6

    1

    Beer

    0.25

    10:18:05

    20

    7

    2

    Wine

    6.00

    10:18:07

    30

    8

    20

    Cheese

    17.00

    10:18:07

    30

    8

    20

    Cheese

    17.00

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:02:00

    10

    9

    6

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:04:00

    10

    10

    1

    Beer

    0.25

    11:09:30

    40

    11

    12

    Bread

    100.00

    11:24:11

    10

    12

    4

    Beer

    0.25

    11:24:11

    10

    12

    4

    Beer

    0.25

    11:24:11

    10

    12

    4

    Beer

    0.25

    11:24:11

    10

    12

    4

    Beer

    0.25

Example 2: Join the datahub_stream1 table and the datahub_stream2 table

  • Test data

    Table 3 datahub_stream1

    a (BIGINT)

    b (BIGINT)

    c (VARCHAR)

    0

    10

    test11

    1

    10

    test21

    Table 4 datahub_stream2

    a (BIGINT)

    b (BIGINT)

    c (VARCHAR)

    0

    10

    test11

    1

    10

    test21

    0

    10

    test31

    1

    10

    test41

  • Test statements

    SELECT s1.c,s2.c 
    FROM datahub_stream1 AS s1
    JOIN datahub_stream2 AS s2 
    ON s1.a = s2.a
    WHERE s1.a = 0;    
  • Test results

    s1.c (VARCHAR)

    s2.c (VARCHAR)

    test11

    test11

    test11

    test31