Topik ini menjelaskan cara menggunakan interval join.
Informasi latar belakang
Interval join memungkinkan dua aliran data digabungkan berdasarkan kunci umum. Fungsinya menghubungkan elemen dari setiap aliran ketika timestamp mereka berada dalam interval waktu relatif yang ditentukan. Setelah kedua aliran digabungkan, kolom timestamp dalam aliran input dipertahankan, sehingga memungkinkan pemrosesan lebih lanjut berdasarkan waktu kejadian.
Sintaksis
SELECT nama-kolom
FROM tabel1 [AS <alias1>]
[INNER | LEFT | RIGHT |FULL ] JOIN tabel2
ON tabel1.nama-kolom1 = tabel2.nama-kunci1 AND EKSPRESI_WAKTU_TERBATASINNER JOIN, LEFT JOIN, RIGHT JOIN, dan FULL JOIN didukung. Jika Anda menggunakan JOIN tanpa spesifikasi, INNER JOIN akan dijalankan secara default.
SEMI JOIN dan ANTI JOIN tidak didukung.
EKSPRESI_WAKTU_TERBATAS adalah ekspresi kondisional interval pada kolom atribut waktu dari dua aliran data. Ekspresi kondisional berikut didukung:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
Contoh
Contoh ini menunjukkan barang yang dikirim dalam waktu 4 jam setelah penempatan pesanan.
Data Uji
Tabel Pesanan (orders)
id
productName
orderTime
1
telepon
2024-04-01 10:00:00.0
2
laptop
2024-04-01 10:02:00.0
3
jam tangan
2024-04-01 10:03:00.0
4
tablet
2024-04-01 10:05:00.0
Tabel Logistik (Shipments)
shipId
orderId
status
shiptime
0
1
dikirim
2024-04-01 11:00:00.0
1
2
dikirim
2024-04-01 17:00:00.0
2
3
dikirim
2024-04-01 12:00:00.0
3
4
dikirim
2024-04-01 11:30:00.0
Pernyataan Uji
CREATE TEMPORARY TABLE Orders( id BIGINT, productName VARCHAR, ordertime TIMESTAMP(3), WATERMARK wk FOR ordertime as withOffset(ordertime, 2000) -- Tentukan kolom ordertime sebagai atribut waktu kejadian tabel dan gunakan strategi watermark tertunda 2 detik. ) WITH ( 'connector' = 'kafka', 'topic' = '<topikAnda>', 'properties.bootstrap.servers' = '<brokerAnda>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); CREATE TEMPORARY TABLE Shipments( shipId BIGINT, orderId BIGINT, status VARCHAR, shiptime TIMESTAMP(3), WATERMARK wk FOR shiptime as withOffset(shiptime, 2000) -- Tentukan kolom ordertime sebagai atribut waktu kejadian tabel dan gunakan strategi watermark tertunda 2 detik. ) WITH ( 'connector' = 'kafka', 'topic' = '<topikAnda>', 'properties.bootstrap.servers' = '<brokerAnda>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); -- Buat sink MySQL. CREATE TEMPORARY TABLE rds_output( id BIGINT, productName VARCHAR, status VARCHAR ) WITH ( 'connector' = 'mysql', 'hostname' = '<hostAnda>', 'port' = '3306', 'username' = '<namaPenggunaAnda>', 'password' = '<kataSandiAnda>', 'database-name' = '<namaBasisDataAnda>', 'table-name' = '<namaTabelAnda>' ); INSERT INTO rds_output SELECT id, productName, status FROM Orders AS o JOIN Shipments AS s on o.id = s.orderId AND o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime;Hasil Uji
id(bigint)
productName(varchar)
status(varchar)
1
telepon
dikirim
3
jam tangan
dikirim
4
tablet
dikirim