全部产品
Search
文档中心

Realtime Compute for Apache Flink:Mulai dengan operasi JOIN Flink SQL

更新时间:Oct 29, 2025

Flink SQL mendukung berbagai jenis operasi JOIN pada tabel dinamis dan menyediakan semantik kueri yang fleksibel. Saat menggunakan JOIN dalam Flink SQL, hindari Produk Kartesius karena dapat menyebabkan kegagalan kueri. Secara default, urutan JOIN tidak dioptimalkan. Untuk meningkatkan performa, atur ulang urutan tabel di klausa FROM, tempatkan tabel dengan frekuensi pembaruan terendah di awal dan yang tertinggi di akhir.

Ikhtisar JOIN

Jenis JOIN

Deskripsi

Perbedaan Sintaksis

JOIN Reguler

JOIN reguler adalah jenis JOIN yang paling umum digunakan. Catatan baru atau pembaruan pada tabel yang akan digabungkan memengaruhi hasil keseluruhan dari operasi JOIN.

Sintaksis JOIN reguler sangat fleksibel dan mendukung operasi INSERT, UPDATE, dan DELETE pada tabel input. Namun, JOIN reguler menyimpan data input secara permanen sebagai status, yang dapat menyebabkan pertumbuhan data tanpa batas. Untuk mencegah hal ini, Anda dapat mengonfigurasi time-to-live (TTL) untuk data status, meskipun ini dapat memengaruhi akurasi hasil.

JOIN Interval

JOIN Interval menghasilkan Produk Kartesius sederhana yang memenuhi kondisi JOIN dan batasan waktu.

Setidaknya satu kondisi join ekuivalen dan satu kondisi join berbasis waktu yang terkandung dalam tabel yang akan digabungkan diperlukan. Anda dapat mendefinisikan rentang waktu sebagai kondisi. Misalnya, Anda dapat menggunakan operator perbandingan, seperti kurang dari (<), kurang dari atau sama dengan (<=), lebih besar dari atau sama dengan (>=), dan lebih besar dari (>), untuk menentukan rentang waktu. Anda juga dapat menggunakan kondisi BETWEEN, atau melakukan pengecekan persamaan pada atribut waktu tipe yang sama (waktu pemrosesan atau waktu event) dalam tabel yang akan digabungkan.

Gabungan Temporal

JOIN Temporal digunakan untuk menggabungkan tabel versi berdasarkan waktu kejadian atau waktu pemrosesan.

Tabel yang akan digabungkan harus memiliki semantik waktu-pemrosesan tipe yang sama. Kondisi JOIN biasanya didasarkan pada timestamp tertentu.

JOIN Lookup

JOIN Lookup digunakan untuk menggabungkan tabel dimensi dari sumber data eksternal dengan tabel fakta, sehingga memperkaya aliran data. Operasi JOIN ini mengharuskan satu tabel memiliki atribut waktu-pemrosesan, sedangkan tabel lainnya merupakan tabel dimensi.

Satu tabel harus berisi atribut waktu-pemrosesan, dan data tabel lainnya harus diperoleh melalui konektor lookup. Kondisi JOIN ekuivalen juga diperlukan.

Lateral joins

JOIN Lateral digunakan untuk menggabungkan setiap baris tabel kiri dengan semua baris hasil dari fungsi bernilai tabel (TVF).

Klausa ON harus selalu berisi kondisi join TRUE tetap. Contoh: JOIN LATERAL TABLE(table_func(order_id)) t(res) ON TRUE.

JOIN Reguler

Empat jenis operasi JOIN sering digunakan:

  • INNER JOIN: Mengembalikan entri data yang memenuhi kondisi JOIN di kedua tabel.

  • LEFT JOIN: Mengembalikan semua entri data di tabel kiri, termasuk baris yang tidak cocok dengan tabel kanan.

  • RIGHT JOIN: Mengembalikan semua entri data di tabel kanan, termasuk baris yang tidak cocok dengan tabel kiri.

  • FULL OUTER JOIN: Mengembalikan gabungan dua tabel, termasuk semua baris di kedua tabel.

Regular joins Diagram

image

Contoh Join Reguler

  1. Masuk ke Konsol Realtime Compute for Apache Flink.

  2. Temukan ruang kerja yang ingin Anda kelola dan klik Console di kolom Actions.

  3. Di panel navigasi sebelah kiri, pilih Development > ETL.

  4. Klik ikon + dan pilih New Blank Stream Draft. Di dialog New Draft, masukkan nama, pilih versi mesin, dan klik Create.

    Berikut adalah contoh kode yang menunjukkan cara menggunakan operasi JOIN untuk menggabungkan baris dari beberapa tabel.
    CREATE TEMPORARY TABLE NOC (
      agent_id STRING,
      codename STRING
    )
    WITH (
      'connector' = 'faker',   -- Faker connector
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',  -- Menghasilkan angka dari lima angka.
      'fields.codename.expression' = '#{superhero.name}',   -- Fungsi bawaan palsu, digunakan untuk menghasilkan nama superhero.
      'number-of-rows' = '10'   -- Menunjukkan bahwa sepuluh baris data dihasilkan.  
    );
    
    CREATE TEMPORARY TABLE RealNames (
      agent_id STRING,
      name     STRING
    )
    WITH (
      'connector' = 'faker',
      'fields.agent_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',
      'fields.name.expression' = '#{Name.full_name}',  -- Fungsi bawaan palsu, digunakan untuk menghasilkan nama secara acak.
      'number-of-rows' = '10'
    );
    
    SELECT
        name,
        codename
    FROM NOC
    INNER JOIN RealNames ON NOC.agent_id = RealNames.agent_id;  -- Jika nilai agent_id sama untuk kedua tabel, name dan codename dihasilkan. Nilai agent_id berkisar antara 1 hingga 5.

  5. Klik Debug di pojok kanan atas, pilih kluster yang akan di-debug, lalu klik OK. Jika tidak ada kluster sesi yang tersedia, buat kluster sesi. Untuk informasi lebih lanjut, lihat Buat Kluster Sesi.

    image

Untuk informasi lebih lanjut tentang penggunaan JOIN reguler, lihat Pernyataan JOIN Reguler.

Joins Interval

JOIN Interval digunakan untuk menggabungkan dua set data berdasarkan interval waktu tertentu. Data dialokasikan ke interval berdasarkan timestamp dan berguna untuk membandingkan dua set data dalam rentang waktu tertentu.

Diagram JOIN Interval

Contoh JOIN Interval

Contoh ini menunjukkan cara menyaring data berdasarkan selisih waktu antara waktu pemesanan (order_time) dan waktu pengiriman (shipment_time) dalam tiga jam.

Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler.

CREATE TEMPORARY TABLE orders (
  id INT,
  order_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1)+5)*(-1) AS INT), CURRENT_TIMESTAMP)    -- Menghasilkan timestamp acak antara 2 hingga 4 jam sebelum waktu lokal.
)
WITH (
  'connector' = 'datagen', -- Datagen connector, digunakan untuk secara berkala menghasilkan data acak.
  "rows-per-second'='10", -- Laju penghasilan data acak, yaitu 10 baris per detik.
  'fields.id.kind'='sequence', -- Generator urutan.
  'fields.id.start'='1',       -- Nilai urutan dimulai dari 1.
  'fields.id.end'='100'        -- Nilai urutan berakhir pada 100.
);

CREATE TEMPORARY TABLE shipments (
  order_id INT,
  shipment_time AS TIMESTAMPADD(HOUR, CAST(FLOOR(RAND()*(1-5+1))+1 AS INT), CURRENT_TIMESTAMP)   -- Menghasilkan timestamp acak antara 0 hingga 2 jam sebelum waktu lokal.
)
WITH (
  'connector' = 'datagen',
  'rows-per-second'='5',
  'fields.order_id.kind'='sequence',
  'fields.order_id.start'='1',
  'fields.order_id.end'='100'
);

SELECT
  o.id AS order_id,
  o.order_time,
  s.shipment_time,
  TIMESTAMPDIFF(HOUR,o.order_time,s.shipment_time) AS hour_diff    -- Selisih waktu antara waktu pemesanan (order_time) dan waktu pengiriman (shipment_time).
FROM orders o
JOIN shipments s ON o.id = s.order_id   
WHERE 
    o.order_time BETWEEN s.shipment_time - INTERVAL '3' HOUR AND s.shipment_time;  -- Filter data di mana selisih waktu antara waktu pemesanan dan waktu pengiriman berada dalam tiga jam.

Klik Debug di pojok kanan atas, lalu klik OK. Gambar berikut menampilkan hasil debugging.

image

Untuk informasi lebih lanjut tentang penggunaan JOIN Interval, lihat JOIN Interval.

JOIN Temporal

JOIN Temporal digunakan untuk menggabungkan tabel temporal, yaitu tabel yang berubah seiring waktu. Misalnya, kurs mata uang atau harga produk dapat berfluktuasi, dan JOIN temporal memetakan waktu transaksi ke nilai kurs atau harga terkait untuk perhitungan yang akurat.

Diagram JOIN Temporal

Contoh JOIN Temporal

Contoh ini menunjukkan skenario bisnis di mana pesanan dihitung berdasarkan kurs yang berlaku saat itu.

(Opsional) Hasilkan Data Simulasi

  1. Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler.

  1. Gunakan konektor Faker untuk menghasilkan data simulasi dan tulis ke tabel Kafka kurs mata uang simulasi menggunakan konektor Upsert Kafka.

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE currency_rates_faker (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.eur_rate.expression' = '#{Number.randomDouble ''4'',''0'',''10''}',
  'fields.rate_time.expression' = '#{date.past ''15'',''SECONDS''}',
  'rows-per-second' = '2'
);

CREATE TEMPORARY TABLE transactions_faker (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3)
)
WITH (
  'connector' = 'faker',
  'fields.id.expression' = '#{Internet.UUID}',
  'fields.currency_code.expression' = '#{Currency.code}',
  'fields.total.expression' = '#{Number.randomDouble ''2'',''10'',''1000''}',
  'fields.transaction_time.expression' = '#{date.past ''30'',''SECONDS''}',
  'rows-per-second' = '2'
);

BEGIN STATEMENT SET;

INSERT INTO currency_rates
SELECT * FROM currency_rates_faker;

INSERT INTO transactions
SELECT * FROM transactions_faker;

END;
  1. Di pojok kanan atas editor SQL, klik Deploy.

  2. Di panel navigasi di sebelah kiri, pilih O&M > Deployments. Di halaman Deployments, temukan penerapan target Anda, dan klik Start di kolom Actions. Di panel Start Job, pilih Initial Mode, lalu klik Start.

Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler dan baca data simulasi untuk debugging.

CREATE TEMPORARY TABLE currency_rates (
  `currency_code` STRING,
  `eur_rate` DECIMAL(6,4),
  `rate_time` TIMESTAMP(3),
  WATERMARK FOR `rate_time` AS rate_time - INTERVAL '15' SECOND,
  PRIMARY KEY (currency_code) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'currency_rates',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'currency_rates',
  'key.format' = 'raw',
  'value.format' = 'json'
);

CREATE TEMPORARY TABLE transactions (
  `id` STRING,
  `currency_code` STRING,
  `total` DECIMAL(10,2),
  `transaction_time` TIMESTAMP(3),
  WATERMARK FOR `transaction_time` AS transaction_time - INTERVAL '30' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'transactions',
  'properties.bootstrap.servers' = '${secret_values.kafkahost}',
  'properties.auto.offset.reset' = 'earliest',
  'properties.group.id' = 'transactions',
  'key.format' = 'raw',
  'key.fields' = 'id',
  'value.format' = 'json'
);

SELECT
  t.id,
  t.total * c.eur_rate AS total_eur,
  c.eur_rate,
  t.total,
  c.currency_code,
  c.rate_time,
  t.transaction_time
FROM transactions t
JOIN currency_rates FOR SYSTEM_TIME AS OF t.transaction_time AS c
ON t.currency_code = c.currency_code 
;

Di pojok kanan atas editor SQL, klik Debug. Di kotak dialog yang muncul, klik OK. Gambar berikut menunjukkan hasil debugging.

Kurs berubah pada 20:16:11 dan 20:35:22. Pesanan transaksi terjadi pada 20:35:14 dan kurs belum berubah pada saat itu. Oleh karena itu, kurs pada 20:16:11 digunakan untuk perhitungan.

image

Lookup Joins

JOIN Lookup digunakan untuk memperkaya data streaming dengan data statis dari sistem eksternal. Misalnya, data produk dapat disimpan di database relasional dan digabungkan dengan data streaming menggunakan konektor MySQL.

Diagram JOIN Lookup

Catatan
  • Tambahkan FOR SYSTEM_TIME AS OF PROCTIME() ke tabel dimensi agar setiap baris snapshot tabel dimensi pada waktu saat ini dikaitkan dengan data sumber.

  • Kondisi ON harus berisi kondisi ekuivalen untuk bidang yang mendukung pencarian acak di tabel dimensi.

Contoh JOIN Lookup

Contoh ini menunjukkan cara menggunakan konektor eksternal untuk memperkaya data pesanan dengan nama produk dari data statis yang disimpan secara eksternal.

Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler.

CREATE TEMPORARY TABLE orders ( 
    order_id STRING,
    product_id INT,
    order_total INT
) WITH (
  'connector' = 'faker',   -- Faker connector
  'fields.order_id.expression' = '#{Internet.uuid}',   -- Menghasilkan pengenal unik universal (UUID) acak.
  'fields.product_id.expression' = '#{number.numberBetween ''1'',''5''}',   -- Menghasilkan angka dari 1 hingga 5.
  'fields.order_total.expression' = '#{number.numberBetween ''1000','5000'}', -- Menghasilkan angka dari 1000 hingga 5000.
  'number-of-rows' = '10'  -- Jumlah baris data yang dihasilkan.
);

-- Asosiasikan data produk statis menggunakan konektor MySQL.
CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    o.order_id,
    p.product_name,
    o.order_total,
  CASE 
    WHEN o.order_total > 3000 THEN 1    
    ELSE 0
  END AS is_importance          -- Tambahkan bidang is_importance. Jika nilai bidang order_total lebih besar dari 3000, nilai bidang ini adalah 1, yang menunjukkan bahwa pesanan tersebut adalah pesanan penting. 
FROM orders o
JOIN products FOR SYSTEM_TIME AS OF PROCTIME() AS p -- Klausul FOR SYSTEM_TIME AS OF PROCTIME() memastikan bahwa setiap baris di tabel orders dikaitkan dengan baris yang cocok dengan kondisi JOIN di tabel products ketika operasi JOIN dilakukan pada tabel orders. 
ON o.product_id = p.product_id;

Gambar berikut menunjukkan contoh data produk yang diasosiasikan menggunakan konektor MySQL.

image

Di pojok kanan atas editor SQL, klik Debug. Di kotak dialog yang muncul, klik OK. Gambar berikut menunjukkan hasil debugging.

image

Untuk informasi lebih lanjut tentang penggunaan JOIN Lookup, lihat Pernyataan JOIN untuk Tabel Dimensi.

Joins Lateral

JOIN Lateral memungkinkan Anda menjalankan subquery untuk setiap baris kueri luar, meningkatkan fleksibilitas dan efisiensi kueri. Namun, kompleksitas subquery internal atau jumlah data yang besar dapat memengaruhi performa.

Contoh JOIN Lateral

Contoh ini menunjukkan cara mengagregasi catatan pesanan penjualan dan menyaring tiga produk teratas berdasarkan jumlah penjualan.

Buat draf di halaman ETL dengan merujuk pada instruksi di bagian JOIN Reguler.

CREATE TEMPORARY TABLE sale (
  sale_id STRING,
  product_id INT,
  sale_num INT
)
WITH (
  'connector' = 'faker',   -- Faker connector
  'fields.sale_id.expression' = '#{Internet.uuid}',   -- Menghasilkan UUID acak.
  'fields.product_id.expression' = '#{regexify ''(1|2|3|4|5){1}''}',  -- Pilih angka dari lima angka.
  'fields.sale_num.expression' = '#{number.numberBetween ''1'',''10''}',  -- Menghasilkan bilangan bulat acak dari 1 hingga 10.
  'number-of-rows' = '50'  -- Hasilkan 50 baris data.
);

CREATE TEMPORARY TABLE products (
 product_id INT,
 product_name STRING,
 PRIMARY KEY(product_id) NOT ENFORCED
)
WITH(
  'connector' = 'mysql',
  'hostname' = '${secret_values.mysqlhost}',
  'port' = '3306',
  'username' = '${secret_values.username}',
  'password' = '${secret_values.password}',
  'database-name' = 'db2024',
  'table-name' = 'products'
);

SELECT 
    p.product_name,
    s.total_sales
FROM  products p
LEFT JOIN LATERAL
    (SELECT SUM(sale_num) AS total_sales FROM sale WHERE sale.product_id = p.product_id) s ON TRUE
    ORDER BY total_sales DESC
    LIMIT 3;

Di pojok kanan atas editor SQL, klik Debug. Di kotak dialog yang muncul, klik OK. Gambar berikut menunjukkan hasil debugging.

image

Referensi