Flink SQL mendukung berbagai jenis operasi JOIN pada tabel dinamis dan menyediakan semantik kueri yang fleksibel. Saat menggunakan JOIN dalam Flink SQL, hindari Produk Kartesius karena dapat menyebabkan kegagalan kueri. Secara default, urutan JOIN tidak dioptimalkan. Untuk meningkatkan performa, atur ulang urutan tabel di klausa FROM, tempatkan tabel dengan frekuensi pembaruan terendah di awal dan yang tertinggi di akhir.
Ikhtisar JOIN
Jenis JOIN | Deskripsi | Perbedaan Sintaksis |
JOIN reguler adalah jenis JOIN yang paling umum digunakan. Catatan baru atau pembaruan pada tabel yang akan digabungkan memengaruhi hasil keseluruhan dari operasi JOIN. | Sintaksis JOIN reguler sangat fleksibel dan mendukung operasi INSERT, UPDATE, dan DELETE pada tabel input. Namun, JOIN reguler menyimpan data input secara permanen sebagai status, yang dapat menyebabkan pertumbuhan data tanpa batas. Untuk mencegah hal ini, Anda dapat mengonfigurasi time-to-live (TTL) untuk data status, meskipun ini dapat memengaruhi akurasi hasil. | |
JOIN Interval menghasilkan Produk Kartesius sederhana yang memenuhi kondisi JOIN dan batasan waktu. | Setidaknya satu kondisi join ekuivalen dan satu kondisi join berbasis waktu yang terkandung dalam tabel yang akan digabungkan diperlukan. Anda dapat mendefinisikan rentang waktu sebagai kondisi. Misalnya, Anda dapat menggunakan operator perbandingan, seperti kurang dari (<), kurang dari atau sama dengan (<=), lebih besar dari atau sama dengan (>=), dan lebih besar dari (>), untuk menentukan rentang waktu. Anda juga dapat menggunakan kondisi | |
JOIN Temporal digunakan untuk menggabungkan tabel versi berdasarkan waktu kejadian atau waktu pemrosesan. | Tabel yang akan digabungkan harus memiliki semantik waktu-pemrosesan tipe yang sama. Kondisi JOIN biasanya didasarkan pada timestamp tertentu. | |
JOIN Lookup digunakan untuk menggabungkan tabel dimensi dari sumber data eksternal dengan tabel fakta, sehingga memperkaya aliran data. Operasi JOIN ini mengharuskan satu tabel memiliki atribut waktu-pemrosesan, sedangkan tabel lainnya merupakan tabel dimensi. | Satu tabel harus berisi atribut waktu-pemrosesan, dan data tabel lainnya harus diperoleh melalui konektor lookup. Kondisi JOIN ekuivalen juga diperlukan. | |
JOIN Lateral digunakan untuk menggabungkan setiap baris tabel kiri dengan semua baris hasil dari fungsi bernilai tabel (TVF). | Klausa |
JOIN Reguler
Empat jenis operasi JOIN sering digunakan:
INNER JOIN: Mengembalikan entri data yang memenuhi kondisi JOIN di kedua tabel.
LEFT JOIN: Mengembalikan semua entri data di tabel kiri, termasuk baris yang tidak cocok dengan tabel kanan.
RIGHT JOIN: Mengembalikan semua entri data di tabel kanan, termasuk baris yang tidak cocok dengan tabel kiri.
FULL OUTER JOIN: Mengembalikan gabungan dua tabel, termasuk semua baris di kedua tabel.
Regular joins Diagram

Contoh Join Reguler
Masuk ke Konsol Realtime Compute for Apache Flink.
Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.
Di panel navigasi sebelah kiri, pilih .
Klik ikon + dan pilih New Blank Stream Draft. Di dialog New Draft, masukkan nama, pilih versi mesin, dan klik Create.
Berikut adalah contoh kode yang menunjukkan cara menggunakan operasi JOIN untuk menggabungkan baris dari beberapa tabel.
CREATE TEMPORARY TABLE NOC ( agent_id STRING, codename STRING ) WITH ( 'connector' = 'faker', -- Faker connector 'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', -- Menghasilkan angka dari lima angka. 'fields.codename.expression' = '#{superhero.name}', -- Fungsi bawaan palsu, digunakan untuk menghasilkan nama superhero. 'number-of-rows' = '10' -- Menunjukkan bahwa sepuluh baris data dihasilkan. ); CREATE TEMPORARY TABLE RealNames ( agent_id STRING, name STRING ) WITH ( 'connector' = 'faker', 'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', 'fields.name.expression' = '#{Name.full_name}', -- Fungsi bawaan palsu, digunakan untuk menghasilkan nama secara acak. 'number-of-rows' = '10' ); SELECT name, codename FROM NOC INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id; -- Jika nilai agent_id sama untuk kedua tabel, name dan codename dihasilkan. Nilai agent_id berkisar antara 1 hingga 5.Klik Debug di pojok kanan atas, pilih kluster yang akan di-debug, lalu klik OK. Jika tidak ada kluster sesi yang tersedia, buat kluster sesi. Untuk informasi lebih lanjut, lihat Buat Kluster Sesi.

Untuk informasi lebih lanjut tentang penggunaan JOIN reguler, lihat Pernyataan JOIN Reguler.
Joins Interval
JOIN Interval digunakan untuk menggabungkan dua set data berdasarkan interval waktu tertentu. Data dialokasikan ke interval berdasarkan timestamp dan berguna untuk membandingkan dua set data dalam rentang waktu tertentu.
Diagram JOIN Interval
Contoh JOIN Interval
Contoh ini menunjukkan cara menyaring data berdasarkan selisih waktu antara waktu pemesanan (order_time) dan waktu pengiriman (shipment_time) dalam tiga jam.
Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler.
CREATE TEMPORARY TABLE orders (
id INT,
order_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP) -- Menghasilkan timestamp acak antara 2 hingga 4 jam sebelum waktu lokal.
)
WITH (
'connector' = 'datagen', -- Datagen connector, digunakan untuk secara berkala menghasilkan data acak.
"rows-per-second'='10", -- Laju penghasilan data acak, yaitu 10 baris per detik.
'fields.id.kind'='sequence', -- Generator urutan.
'fields.id.start'='1', -- Nilai urutan dimulai dari 1.
'fields.id.end'='100' -- Nilai urutan berakhir pada 100.
);
CREATE TEMPORARY TABLE shipments (
order_id INT,
shipment_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1))+1 AS INT), CURRENT_TIMESTAMP) -- Menghasilkan timestamp acak antara 0 hingga 2 jam sebelum waktu lokal.
)
WITH (
'connector' = 'datagen',
'rows-per-second'='5',
'fields.order_id.kind'='sequence',
'fields.order_id.start'='1',
'fields.order_id.end'='100'
);
SELECT
o.id AS order_id,
o.order_time,
s.shipment_time,
TIMESTAMPDIFF(HOUR,o.order_time,s.shipment_time) AS hour_diff -- Selisih waktu antara waktu pemesanan (order_time) dan waktu pengiriman (shipment_time).
FROM orders o
JOIN shipments s ON o.id = s.order_id
WHERE
o.order_time BETWEEN s.shipment_time - INTERVAL '3' HOUR AND s.shipment_time; -- Filter data di mana selisih waktu antara waktu pemesanan dan waktu pengiriman berada dalam tiga jam.Klik Debug di pojok kanan atas, lalu klik OK. Gambar berikut menampilkan hasil debugging.

Untuk informasi lebih lanjut tentang penggunaan JOIN Interval, lihat JOIN Interval.
JOIN Temporal
JOIN Temporal digunakan untuk menggabungkan tabel temporal, yaitu tabel yang berubah seiring waktu. Misalnya, kurs mata uang atau harga produk dapat berfluktuasi, dan JOIN temporal memetakan waktu transaksi ke nilai kurs atau harga terkait untuk perhitungan yang akurat.
Diagram JOIN Temporal
Contoh JOIN Temporal
Contoh ini menunjukkan skenario bisnis di mana pesanan dihitung berdasarkan kurs yang berlaku saat itu.
Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler dan baca data simulasi untuk debugging.
CREATE TEMPORARY TABLE currency_rates (
`currency_code` STRING,
`eur_rate` DECIMAL(6,4),
`rate_time` TIMESTAMP(3),
WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'currency_rates',
'properties.bootstrap.servers' = '${secret_values.kafkahost}',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'currency_rates',
'key.format' = 'raw',
'value.format' = 'json'
);
CREATE TEMPORARY TABLE transactions (
`id` STRING,
`currency_code` STRING,
`total` DECIMAL(10,2),
`transaction_time` TIMESTAMP(3),
WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'transactions',
'properties.bootstrap.servers' = '${secret_values.kafkahost}',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'transactions',
'key.format' = 'raw',
'key.fields' = 'id',
'value.format' = 'json'
);
SELECT
t.id,
t.total * c.eur_rate AS total_eur,
c.eur_rate,
t.total,
c.currency_code,
c.rate_time,
t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code
;Di pojok kanan atas editor SQL, klik Debug. Di kotak dialog yang muncul, klik OK. Gambar berikut menunjukkan hasil debugging.
Kurs berubah pada 20:16:11 dan 20:35:22. Pesanan transaksi terjadi pada 20:35:14 dan kurs belum berubah pada saat itu. Oleh karena itu, kurs pada 20:16:11 digunakan untuk perhitungan.

Lookup Joins
JOIN Lookup digunakan untuk memperkaya data streaming dengan data statis dari sistem eksternal. Misalnya, data produk dapat disimpan di database relasional dan digabungkan dengan data streaming menggunakan konektor MySQL.
Diagram JOIN Lookup
Tambahkan FOR SYSTEM_TIME AS OF PROCTIME() ke tabel dimensi agar setiap baris snapshot tabel dimensi pada waktu saat ini dikaitkan dengan data sumber.
Kondisi ON harus berisi kondisi ekuivalen untuk bidang yang mendukung pencarian acak di tabel dimensi.
Contoh JOIN Lookup
Contoh ini menunjukkan cara menggunakan konektor eksternal untuk memperkaya data pesanan dengan nama produk dari data statis yang disimpan secara eksternal.
Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler.
CREATE TEMPORARY TABLE orders (
order_id STRING,
product_id INT,
order_total INT
) WITH (
'connector' = 'faker', -- Faker connector
'fields.order_id.expression' = '#{Internet.uuid}', -- Menghasilkan pengenal unik universal (UUID) acak.
'fields.product_id.expression' = '#{number.numberBetween ''1'',''5''}', -- Menghasilkan angka dari 1 hingga 5.
'fields.order_total.expression' = '#{number.numberBetween ''1000','5000'}', -- Menghasilkan angka dari 1000 hingga 5000.
'number-of-rows' = '10' -- Jumlah baris data yang dihasilkan.
);
-- Asosiasikan data produk statis menggunakan konektor MySQL.
CREATE TEMPORARY TABLE products (
product_id INT,
product_name STRING
)
WITH(
'connector' = 'mysql',
'hostname' = '${secret_values.mysqlhost}',
'port' = '3306',
'username' = '${secret_values.username}',
'password' = '${secret_values.password}',
'database-name' = 'db2024',
'table-name' = 'products'
);
SELECT
o.order_id,
p.product_name,
o.order_total,
CASE
WHEN o.order_total > 3000 THEN 1
ELSE 0
END AS is_importance -- Tambahkan bidang is_importance. Jika nilai bidang order_total lebih besar dari 3000, nilai bidang ini adalah 1, yang menunjukkan bahwa pesanan tersebut adalah pesanan penting.
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() AS p -- Klausul FOR SYSTEM_TIME AS OF PROCTIME() memastikan bahwa setiap baris di tabel orders dikaitkan dengan baris yang cocok dengan kondisi JOIN di tabel products ketika operasi JOIN dilakukan pada tabel orders.
ON o.product_id = p.product_id;Gambar berikut menunjukkan contoh data produk yang diasosiasikan menggunakan konektor MySQL.

Di pojok kanan atas editor SQL, klik Debug. Di kotak dialog yang muncul, klik OK. Gambar berikut menunjukkan hasil debugging.

Untuk informasi lebih lanjut tentang penggunaan JOIN Lookup, lihat Pernyataan JOIN untuk Tabel Dimensi.
Joins Lateral
JOIN Lateral memungkinkan Anda menjalankan subquery untuk setiap baris kueri luar, meningkatkan fleksibilitas dan efisiensi kueri. Namun, kompleksitas subquery internal atau jumlah data yang besar dapat memengaruhi performa.
Contoh JOIN Lateral
Contoh ini menunjukkan cara mengagregasi catatan pesanan penjualan dan menyaring tiga produk teratas berdasarkan jumlah penjualan.
Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler.
CREATE TEMPORARY TABLE sale (
sale_id STRING,
product_id INT,
sale_num INT
)
WITH (
'connector' = 'faker', -- Faker connector
'fields.sale_id.expression' = '#{Internet.uuid}', -- Menghasilkan UUID acak.
'fields.product_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}', -- Pilih angka dari lima angka.
'fields.sale_num.expression' = '#{number.numberBetween ''1'',''10''}', -- Menghasilkan bilangan bulat acak dari 1 hingga 10.
'number-of-rows' = '50' -- Hasilkan 50 baris data.
);
CREATE TEMPORARY TABLE products (
product_id INT,
product_name STRING,
PRIMARY KEY(product_id) NOT ENFORCED
)
WITH(
'connector' = 'mysql',
'hostname' = '${secret_values.mysqlhost}',
'port' = '3306',
'username' = '${secret_values.username}',
'password' = '${secret_values.password}',
'database-name' = 'db2024',
'table-name' = 'products'
);
SELECT
p.product_name,
s.total_sales
FROM products p
LEFT JOIN LATERAL
(SELECT SUM(sale_num) AS total_sales FROM sale WHERE sale.product_id = p.product_id) s ON TRUE
ORDER BY total_sales DESC
LIMIT 3;Di pojok kanan atas editor SQL, klik Debug. Di kotak dialog yang muncul, klik OK. Gambar berikut menunjukkan hasil debugging.

Referensi
Jika Anda hanya tertarik pada waktu pemrosesan suatu event, bukan pada waktu kejadian event tersebut, Anda dapat menggunakan pernyataan join temporal waktu pemrosesan. Untuk informasi lebih lanjut, lihat Pernyataan join temporal waktu pemrosesan.
Untuk informasi lebih lanjut tentang JOIN reguler, lihat Pernyataan JOIN Reguler.
Untuk informasi lebih lanjut tentang JOIN Interval, lihat JOIN Interval.
Untuk informasi lebih lanjut tentang JOIN Lookup, lihat Pernyataan JOIN untuk Tabel Dimensi.