Flink SQL mendukung operasi join yang kompleks dan fleksibel pada tabel dinamis. Topik ini menjelaskan cara menggunakan pernyataan join reguler.
Informasi latar belakang
Secara semantik, pernyataan JOIN untuk komputasi real-time setara dengan pernyataan untuk pemrosesan batch. Keduanya digunakan untuk menggabungkan dua tabel. Perbedaannya adalah hasil join dalam komputasi real-time terus diperbarui karena sifat tabel yang dinamis, memastikan konsistensi hasil akhir dengan pemrosesan batch.
Sintaksis
tableReference [, tableReference ]*
| tableExpression [ NATURAL | INNER ] [ { LEFT | RIGHT | FULL } [ OUTER ] ] JOIN tableExpression [ joinCondition ]
| tableExpression CROSS JOIN tableExpression
| tableExpression [ CROSS | OUTER ] APPLY tableExpression
joinCondition:
ON booleanExpression
| USING '(' column [, column ]* ')'tableReference: menentukan nama tabel.
tableExpression: menentukan ekspresi.
joinCondition: menentukan kondisi join.
Tips
Mulai Ververica Runtime (VVR) versi 8.0.1 atau lebih baru, Anda dapat menggunakan tips untuk menentukan nilai time-to-live (TTL) berbeda untuk status aliran kiri dan kanan dalam join reguler. Hal ini membantu mengurangi ukuran data status yang harus dipertahankan.
Sintaksis
-- VVR 8.0.1 dan versi lebih baru menggunakan sintaksis berikut: SELECT /*+ JOIN_STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ... -- VVR 8.0.7 dan versi lebih baru juga mendukung sintaksis berikut yang digunakan oleh Apache Flink: SELECT /*+ STATE_TTL('tableReference1' = 'ttl1' [, 'tableReference2' = 'ttl2']*) */ ...Perhatian
Gunakan tip JOIN_STATE_TTL hanya untuk join reguler. Tip ini tidak mendukung lookup, interval, dan window joins.
Jika Anda menggunakan tip JOIN_STATE_TTL untuk menentukan TTL status hanya untuk satu aliran dalam join reguler, aliran lainnya akan menggunakan TTL status tingkat penerapan yang ditentukan oleh parameter table.exec.state.ttl. Nilai defaultnya adalah 1,5 hari. Untuk informasi lebih lanjut tentang parameter ini, lihat Parameter Dasar.
Anda dapat menyetel parameter tableReference ke nama tabel, nama tampilan, atau alias. Jika Anda menentukan alias untuk tabel, Anda harus menggunakan alias tersebut.
Tips untuk join reguler merupakan fitur eksperimental. Sintaksis mungkin berubah di masa depan.
Contoh
-- Gunakan alias dalam tip. SELECT /*+ JOIN_STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- VVR 8.0.7 dan versi lebih baru juga mendukung sintaksis berikut yang digunakan oleh Apache Flink: SELECT /*+ STATE_TTL('o' = '3d', 'p' = '1d') */ o.rowtime, o.productid, o.orderid, o.units, p.name, p.unitprice FROM Orders AS o JOIN Products AS p ON o.productid = p.productid; -- Gunakan nama tabel dalam tip. SELECT /*+ JOIN_STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- VVR 8.0.7 dan versi lebih baru juga mendukung sintaksis berikut yang digunakan oleh Apache Flink: SELECT /*+ STATE_TTL('Orders' = '3d', 'Products' = '1d') */ * FROM Orders JOIN Products ON Orders.productid = Products.productid; -- Gunakan nama tampilan dalam tip. CREATE TEMPORARY VIEW v AS SELECT id, ... FROM ( SELECT id, ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ..) AS rn FROM src1 WHERE ... ) tmp WHERE rn = 1; SELECT /*+ JOIN_STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id; -- VVR 8.0.7 dan versi lebih baru juga mendukung sintaksis berikut yang digunakan oleh Apache Flink: SELECT /*+ STATE_TTL('v' = '1d', 'b' = '3d') */ v.* , b.* FROM v LEFT JOIN src2 AS b ON v.id = b.id;
Contoh 1: Menggabungkan tabel Orders dan Shipments
Data Uji
Tabel 1. Orders
id
productName
ordertime
1
phone_a
2025-05-01 10:00:00.0
2
notebook_x
2025-05-01 10:02:00.0
3
phone_b
2025-05-01 10:03:00.0
4
pad_m
2025-05-01 10:05:00.0
Tabel 2. Shipments
shipId
orderId
status
shipTime
101
1
shipped
2025-05-01 11:00:00.0
102
2
delivered
2025-05-01 17:00:00.0
103
3
shipped
2025-05-01 12:00:00.0
104
4
shipped
2025-05-01 11:30:00.0
Pernyataan Uji
SELECT id, productName, status FROM orders o JOIN shipments s ON o.id = s.orderId;Hasil Uji
id
productName
status
1
phone_a
shipped
2
notebook_x
delivered
3
phone_b
shipped
4
pad_m
shipped
Contoh 2: Menggabungkan tabel datahub_stream1 dan datahub_stream2
Data Uji
Tabel 3. datahub_stream1
a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
Tabel 4. datahub_stream2
a (BIGINT)
b (BIGINT)
c (VARCHAR)
0
10
test11
1
10
test21
0
10
test31
1
10
test41
Pernyataan Uji
SELECT s1.c,s2.c FROM datahub_stream1 AS s1 JOIN datahub_stream2 AS s2 ON s1.a = s2.a WHERE s1.a = 0;Hasil Uji
s1.c (VARCHAR)
s2.c (VARCHAR)
test11
test11
test11
test31