すべてのプロダクト
Search
ドキュメントセンター

Realtime Compute for Apache Flink:標準結合ステートメント

最終更新日:Jun 13, 2025

Flink SQL は、動的テーブルに対する複雑で柔軟な結合操作をサポートしています。このトピックでは、標準結合ステートメントの使用方法について説明します。

背景情報

リアルタイムコンピューティングの JOIN ステートメントは、バッチ処理の JOIN ステートメントと意味的に同等です。どちらのステートメントも 2 つのテーブルを結合するために使用されます。違いは、リアルタイムコンピューティングでの結合結果は、テーブルが動的であるため継続的に更新されることです。これにより、最終結果がバッチ処理の結果と一致することが保証されます。

構文

tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression

joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'
  • tableReference: テーブル名を指定します。

  • tableExpression: 式を指定します。

  • joinCondition: 結合条件を指定します。

ヒント

Ververica Runtime (VVR) 8.0.1 以降では、ヒントを使用して、標準結合における左右のストリームの状態に対して異なる生存時間 (TTL) 値を指定できます。これにより、維持される状態データのサイズを削減できます。

  • 構文

    -- VVR 8.0.1 以降のバージョンは、次の構文を使用します。
    SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
    
    -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。
    SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...
  • 注意事項

    • JOIN_STATE_TTL ヒントは、標準結合のみに使用してください。このヒントは、ルックアップ結合、インターバル結合、およびウィンドウ結合をサポートしていません。

    • JOIN_STATE_TTL ヒントを使用して標準結合の 1 つのストリームの状態 TTL のみを指定した場合、もう一方のストリームは table.exec.state.ttl パラメーターで指定されたデプロイメントレベルの状態 TTL を使用します。デフォルト値は 1.5 日です。このパラメーターの詳細については、「基本パラメーター」をご参照ください。

    • tableReference パラメーターには、テーブル名、ビュー名、またはエイリアスを設定できます。テーブルにエイリアスを指定した場合は、エイリアスを使用する必要があります。

    • 標準結合のヒントは実験的な機能です。構文は将来変更される可能性があります。

  • -- ヒントでエイリアスを使用します。
    SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */
      o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
    -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。
    SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */
      o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice
      FROM Orders AS o
      JOIN Products AS p
    ON o.productid = p.productid;
    
    -- ヒントでテーブル名を使用します。
    SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
      FROM Orders
      JOIN Products
    ON Orders.productid = Products.productid;
    -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。
    SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ *
      FROM Orders
      JOIN Products
    ON Orders.productid = Products.productid;
    
    -- ヒントでビュー名を使用します。
    CREATE TEMPORARY VIEW v AS
    SELECT id, ...
    	FROM (
    		SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn
    		FROM src1
    		WHERE ...
    	) tmp
    WHERE rn = 1;
    	
    SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
    FROM v
    LEFT JOIN src2 AS b ON v.id = b.id;
    -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。
    SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.*
    FROM v
    LEFT JOIN src2 AS b ON v.id = b.id;

例 1: Orders テーブルと Shipments テーブルを結合する

  • テストデータ

    表 1. Orders

    ID

    製品名

    注文日時

    1

    phone_a

    2025-05-01 10:00:00.0

    2

    notebook_x

    2025-05-01 10:02:00.0

    3

    phone_b

    2025-05-01 10:03:00.0

    4

    pad_m

    2025-05-01 10:05:00.0

    表 2. 出荷

    出荷ID

    注文ID

    ステータス

    出荷日時

    101

    1

    出荷済み

    2025-05-01 11:00:00.0

    102

    2

    配達済み

    2025-05-01 17:00:00.0

    103

    3

    出荷済み

    2025-05-01 12:00:00.0

    104

    4

    出荷済み

    2025-05-01 11:30:00.0

  • テストステートメント

    SELECT id, productName, status
    FROM orders o 
    JOIN shipments s 
      ON o.id = s.orderId;
  • テスト結果

    ID

    製品名

    ステータス

    1

    phone_a

    出荷済み

    2

    notebook_x

    配達済み

    3

    phone_b

    出荷済み

    4

    pad_m

    出荷済み

例 2: datahub_stream1 テーブルと datahub_stream2 テーブルを結合する

  • テストデータ

    表 3. datahub_stream1

    a (BIGINT)

    b (BIGINT)

    c (VARCHAR)

    0

    10

    test11

    1

    10

    test21

    表 4. datahub_stream2

    a (BIGINT)

    b (BIGINT)

    c (VARCHAR)

    0

    10

    test11

    1

    10

    test21

    0

    10

    test31

    1

    10

    test41

  • テストステートメント

    SELECT s1.c,s2.c 
    FROM datahub_stream1 AS s1
    JOIN datahub_stream2 AS s2 
    ON s1.a = s2.a
    WHERE s1.a = 0;    
  • テスト結果

    s1.c (VARCHAR)

    s2.c (VARCHAR)

    test11

    test11

    test11

    test31