Flink SQL は、動的テーブルに対する複雑で柔軟な結合操作をサポートしています。このトピックでは、標準結合ステートメントの使用方法について説明します。
背景情報
リアルタイムコンピューティングの JOIN ステートメントは、バッチ処理の JOIN ステートメントと意味的に同等です。どちらのステートメントも 2 つのテーブルを結合するために使用されます。違いは、リアルタイムコンピューティングでの結合結果は、テーブルが動的であるため継続的に更新されることです。これにより、最終結果がバッチ処理の結果と一致することが保証されます。
構文
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'tableReference: テーブル名を指定します。
tableExpression: 式を指定します。
joinCondition: 結合条件を指定します。
ヒント
Ververica Runtime (VVR) 8.0.1 以降では、ヒントを使用して、標準結合における左右のストリームの状態に対して異なる生存時間 (TTL) 値を指定できます。これにより、維持される状態データのサイズを削減できます。
構文
-- VVR 8.0.1 以降のバージョンは、次の構文を使用します。 SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ... -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。 SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...注意事項
JOIN_STATE_TTL ヒントは、標準結合のみに使用してください。このヒントは、ルックアップ結合、インターバル結合、およびウィンドウ結合をサポートしていません。
JOIN_STATE_TTL ヒントを使用して標準結合の 1 つのストリームの状態 TTL のみを指定した場合、もう一方のストリームは table.exec.state.ttl パラメーターで指定されたデプロイメントレベルの状態 TTL を使用します。デフォルト値は 1.5 日です。このパラメーターの詳細については、「基本パラメーター」をご参照ください。
tableReference パラメーターには、テーブル名、ビュー名、またはエイリアスを設定できます。テーブルにエイリアスを指定した場合は、エイリアスを使用する必要があります。
標準結合のヒントは実験的な機能です。構文は将来変更される可能性があります。
例
-- ヒントでエイリアスを使用します。 SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。 SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- ヒントでテーブル名を使用します。 SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。 SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- ヒントでビュー名を使用します。 CREATE TEMPORARY VIEW v AS SELECT id, ... FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn FROM src1 WHERE ... ) tmp WHERE rn = 1; SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id; -- VVR 8.0.7 以降のバージョンでは、Apache Flink で使用される次の構文もサポートされています。 SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id;
例 1: Orders テーブルと Shipments テーブルを結合する
テストデータ
表 1. Orders
ID
製品名
注文日時
1
phone_a
2025-05-01 10:00:00.0
2
notebook_x
2025-05-01 10:02:00.0
3
phone_b
2025-05-01 10:03:00.0
4
pad_m
2025-05-01 10:05:00.0
表 2. 出荷
出荷ID
注文ID
ステータス
出荷日時
101
1
出荷済み
2025-05-01 11:00:00.0
102
2
配達済み
2025-05-01 17:00:00.0
103
3
出荷済み
2025-05-01 12:00:00.0
104
4
出荷済み
2025-05-01 11:30:00.0
テストステートメント
SELECT id, productName, status FROM orders o JOIN shipments s ON o.id = s.orderId;テスト結果
ID
製品名
ステータス
1
phone_a
出荷済み
2
notebook_x
配達済み
3
phone_b
出荷済み
4
pad_m
出荷済み
例 2: datahub_stream1 テーブルと datahub_stream2 テーブルを結合する
テストデータ
表 3. datahub_stream1
a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
表 4. datahub_stream2
a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
0
10
test31
1
10
test41
テストステートメント
SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;テスト結果
s1.c (VARCHAR)
s2.c (VARCHAR)
test11
test11
test11
test31