All Products
Search
Document Center

Realtime Compute for Apache Flink:Lookup join

Last Updated:Mar 10, 2026

Di Realtime Compute for Apache Flink, setiap aliran data dapat dikaitkan dengan tabel dimensi dari sumber data eksternal, memungkinkan Anda menjalankan kueri terkait di Realtime Compute for Apache Flink.

Informasi latar belakang

Sebagian besar konektor memungkinkan Anda menentukan kebijakan cache untuk operasi JOIN pada tabel dimensi. Konektor yang berbeda mendukung kebijakan cache yang berbeda; untuk informasi selengkapnya, lihat dokumentasi konektor terkait. Kebijakan cache berikut ini didukung:

  • None: Data tidak di-cache. Ini adalah nilai default.

  • LRU: Hanya data tertentu dalam tabel dimensi yang di-cache. Setiap kali sistem menerima catatan data, sistem mencari cache tersebut. Jika tidak ditemukan di cache, sistem akan mengambil catatan tersebut dari tabel dimensi fisik.

  • ALL: Semua data dalam tabel dimensi di-cache. Sebelum penerapan dijalankan, sistem memuat seluruh data dari tabel dimensi ke cache, sehingga semua kueri selanjutnya menggunakan cache tersebut. Jika data yang memenuhi persyaratan tidak ditemukan di cache, berarti kunci tersebut tidak ada. Sistem memuat ulang semua data ke cache setelah entri cache kedaluwarsa. Jika jumlah data dalam tabel remote kecil dan banyak kunci yang tidak ditemukan, kami merekomendasikan mengatur parameter ini ke ALL. Tabel sumber dan tabel dimensi tidak dapat dikaitkan berdasarkan klausa ON.

Catatan
  • Pertimbangkan keseimbangan antara performa real-time dan performa pemrosesan data sesuai kebutuhan. Jika Anda memerlukan pembaruan data secara real-time, izinkan konektor membaca data langsung dari tabel dimensi tanpa menggunakan cache.

  • Jika Anda ingin menggunakan kebijakan cache, atur kebijakan cache ke LRU dan tentukan waktu hidup (TTL) untuk menyimpan data terbaru. Anda dapat mengatur TTL ke nilai kecil, seperti beberapa detik hingga puluhan detik, sehingga data dimuat dari tabel sumber pada interval yang ditentukan.

  • Jika kebijakan cache adalah ALL, pantau penggunaan memori operator untuk mencegah error out of memory (OOM).

  • Jika kebijakan cache adalah ALL, tambahkan memori operator untuk penggabungan tabel karena sistem memuat data dari tabel dimensi secara asinkron. Ukuran memori yang ditambahkan adalah dua kali lipat dari ukuran tabel remote.

Batasan

  • Anda hanya dapat mengaitkan aliran data dengan snapshot tabel dimensi yang diambil pada saat itu.

  • Tabel dimensi mendukung operasi INNER JOIN dan LEFT JOIN, tetapi tidak mendukung RIGHT JOIN atau FULL JOIN.

Peringatan

  • Untuk melakukan penggabungan tabel satu-ke-satu, pastikan kondisi join mencakup equi-join pada bidang unik dalam tabel dimensi.

  • Setiap aliran data hanya dikaitkan dengan data terbaru dalam tabel dimensi pada waktu saat itu. Artinya, operasi JOIN hanya dilakukan berdasarkan waktu pemrosesan. Oleh karena itu, jika data dalam tabel dimensi ditambahkan, diperbarui, atau dihapus setelah operasi JOIN dilakukan, data terkait tetap tidak berubah. Untuk informasi lebih lanjut tentang perilaku tabel dimensi tertentu, lihat Konektor yang didukung.

Sintaksis JOIN Tabel Dimensi

SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON table1.column-name1 = table2.key-name1;
Catatan
  • Tambahkan FOR SYSTEM_TIME AS OF PROCTIME() di akhir nama tabel dimensi agar setiap catatan data dalam tabel dimensi yang terlihat pada waktu saat itu dikaitkan dengan data sumber.

  • Kondisi ON harus mencakup kondisi ekuivalen untuk bidang yang dapat dicari secara acak dalam tabel dimensi.

  • Dalam kondisi join yang ditentukan dalam klausa ON, bidang dalam tabel dimensi tidak boleh menggunakan fungsi konversi tipe, seperti CAST. Jika perlu mengonversi tipe data, lakukan konversi tersebut pada bidang di tabel sumber.

Petunjuk join untuk tabel dimensi

Anda dapat mengonfigurasi strategi join tabel dimensi menggunakan petunjuk tabel dimensi. Untuk informasi selengkapnya tentang fitur hint, lihat Flink SQL Hints. Petunjuk tabel dimensi mencakup LOOKUP hints dan petunjuk join lainnya.

Catatan
  • Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0 atau versi lebih baru yang mendukung LOOKUP hints.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau versi lebih baru yang memungkinkan Anda mengonfigurasi strategi shuffle menggunakan LOOKUP hints.

  • Di Realtime Compute for Apache Flink yang menggunakan VVR 8.0 atau versi lebih baru, alias dapat ditentukan dalam petunjuk join untuk tabel dimensi. Jika alias ditentukan untuk tabel dimensi, alias tersebut harus digunakan dalam petunjuk join.

  • Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0 atau versi lebih baru yang mendukung petunjuk join lainnya.

LOOKUP hints

Fitur LOOKUP hint di Realtime Compute for Apache Flink konsisten dengan fitur LOOKUP hint yang disediakan oleh komunitas open source. Anda dapat mengonfigurasi strategi lookup sinkron, asinkron, dan retry untuk tabel dimensi. Untuk informasi selengkapnya, lihat LOOKUP hints. Di Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau versi lebih baru, fitur LOOKUP hint diperluas untuk memungkinkan Anda mengonfigurasi 'shuffle' = 'true', sehingga Anda dapat menentukan strategi shuffle untuk operasi JOIN pada tabel dimensi. Tabel berikut menjelaskan strategi shuffle dalam berbagai skenario.

Skenario

Kebijakan join

Opsi 'shuffle' = 'true' tidak dikonfigurasi.

Strategi shuffle default mesin digunakan.

Opsi 'shuffle' = 'true' tidak dikonfigurasi, dan konektor tabel dimensi tidak menyediakan kebijakan join khusus.

Opsi 'shuffle' = 'true' dikonfigurasi, dan konektor tabel dimensi tidak menyediakan kebijakan join khusus.

Secara default, strategi SHUFFLE_HASH digunakan. Untuk informasi selengkapnya, lihat SHUFFLE_HASH.

Opsi 'shuffle' = 'true' dikonfigurasi, dan konektor tabel dimensi menyediakan kebijakan join khusus.

Strategi shuffle khusus dari konektor tabel dimensi digunakan.

Catatan

Hanya Streaming data lakehouse Paimon yang menyediakan strategi shuffle khusus. Jika kolom join tabel dimensi mencakup semua bidang bucket, tabel dimensi tersebut di-shuffle berdasarkan bucket.

Kode contoh berikut menunjukkan cara mengonfigurasi strategi shuffle saat melakukan operasi JOIN pada tabel dimensi:

-- Konfigurasikan kebijakan shuffle hanya untuk join tabel dimensi dim1.
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- Konfigurasikan kebijakan shuffle untuk join tabel dimensi dim1 dan dim2.
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b

-- Anda harus menggunakan alias D1 untuk mengonfigurasi kebijakan shuffle untuk join tabel dimensi dim1.
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

-- Gunakan alias untuk mengonfigurasi kebijakan shuffle untuk join tabel dimensi dim1 dan dim2.
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T 
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

Petunjuk join lainnya

Petunjuk join lainnya untuk tabel dimensi hanya digunakan untuk mengonfigurasi strategi join, termasuk SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, dan SKEW. Tabel berikut menjelaskan skenario penggunaan strategi join berdasarkan konfigurasi kebijakan cache untuk tabel dimensi.

Kebijakan cache

SHUFFLE_HASH

REPLICATED_SHUFFLE_HASH

(Ekuivalen dengan SKEW)

None

Kami merekomendasikan agar Anda tidak menggunakan strategi join ini. Jika strategi join ini digunakan, data utama akan menimbulkan overhead jaringan tambahan.

Kami merekomendasikan agar Anda tidak menggunakan strategi join ini. Jika strategi join ini digunakan, data utama akan menimbulkan overhead jaringan tambahan.

LRU

Jika I/O lookup tabel dimensi menjadi bottleneck, kami merekomendasikan agar Anda menggunakan strategi join ini. Jika data utama memiliki temporal locality pada kunci gabungan, strategi join ini dapat meningkatkan rasio hit cache dan mengurangi jumlah permintaan I/O. Hal ini meningkatkan throughput total.

Penting

Data utama menimbulkan overhead jaringan tambahan. Jika data utama mengalami kesenjangan data pada kunci gabungan dan terdapat bottleneck performa, kami merekomendasikan agar Anda menggunakan strategi join REPLICATED_SHUFFLE_HASH.

Jika I/O lookup tabel dimensi menjadi bottleneck dan data utama mengalami kesenjangan data pada kunci gabungan, kami merekomendasikan agar Anda menggunakan strategi join ini. Jika data utama memiliki temporal locality pada kunci gabungan, strategi join ini dapat meningkatkan rasio hit cache dan mengurangi jumlah permintaan I/O. Hal ini meningkatkan throughput total.

ALL

Jika penggunaan memori tabel dimensi menjadi bottleneck, kami merekomendasikan agar Anda menggunakan strategi join ini. Dengan cara ini, penggunaan memori dapat dikurangi menjadi nilai 1/Parallelism.

Penting

Data utama menimbulkan overhead jaringan tambahan. Jika data utama mengalami kesenjangan data pada kunci gabungan dan terdapat bottleneck performa, kami merekomendasikan agar Anda menggunakan strategi join REPLICATED_SHUFFLE_HASH.

Jika penggunaan memori tabel dimensi menjadi bottleneck dan data utama mengalami kesenjangan data pada kunci gabungan, kami merekomendasikan agar Anda menggunakan strategi join ini. Dengan cara ini, penggunaan memori dapat dikurangi menjadi nilai Number of buckets/Parallelism.

SHUFFLE_HASH

  • Efek

    Strategi join SHUFFLE_HASH melakukan shuffle data utama berdasarkan kunci gabungan sebelum operasi JOIN. Jika kebijakan cache adalah LRU, rasio hit cache meningkat dan jumlah permintaan I/O berkurang. Jika kebijakan cache adalah ALL, penggunaan memori berkurang. Anda dapat menentukan beberapa tabel dimensi dalam setiap petunjuk join SHUFFLE_HASH.

  • Batasan

    Meskipun overhead memori berkurang, strategi join SHUFFLE_HASH menimbulkan overhead jaringan tambahan karena data hulu perlu di-shuffle berdasarkan kunci gabungan. Oleh karena itu, strategi ini tidak cocok untuk skenario berikut:

    • Data utama mengalami kesenjangan data parah pada kunci gabungan. Penggunaan strategi join SHUFFLE_HASH dalam kasus ini dapat menyebabkan bottleneck performa pada operator join, yang berpotensi menimbulkan tekanan balik parah dalam penerapan streaming atau long tails dalam penerapan batch. Dalam skenario ini, kami merekomendasikan menggunakan strategi join REPLICATED_SHUFFLE_HASH.

    • Jika tabel dimensi berisi data dalam jumlah kecil dan tidak mengalami bottleneck memori selama pemuatan tabel ketika kebijakan cache adalah ALL, penghematan overhead memori yang diperoleh mungkin tidak sebanding dengan overhead jaringan tambahan yang ditimbulkan.

  • Kode contoh

    -- Aktifkan strategi join SHUFFLE_HASH hanya untuk tabel dimensi dim1.
    SELECT /*+ SHUFFLE_HASH(dim1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- Aktifkan strategi join SHUFFLE_HASH untuk tabel dimensi dim1 dan dim2.
    SELECT /*+ SHUFFLE_HASH(dim1, dim2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
    
    -- Gunakan alias D1 untuk tabel dimensi dim1 dalam petunjuk untuk mengaktifkan strategi join SHUFFLE_HASH.
    SELECT /*+ SHUFFLE_HASH(D1) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
    
    -- Gunakan alias untuk tabel dimensi dim1 dan dim2 dalam petunjuk untuk mengaktifkan strategi join SHUFFLE_HASH.
    SELECT /*+ SHUFFLE_HASH(D1, D2) */
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b

REPLICATED_SHUFFLE_HASH

  • Efek

    Efek REPLICATED_SHUFFLE_HASH pada dasarnya sama dengan SHUFFLE_HASH. Namun, REPLICATED_SHUFFLE_HASH menyebarkan data utama yang memiliki kunci yang sama secara acak ke sejumlah thread konkuren tertentu untuk mengatasi bottleneck performa akibat kesenjangan data. Anda dapat menentukan beberapa tabel dimensi dalam setiap petunjuk join REPLICATED_SHUFFLE_HASH.

  • Batasan

    • Anda harus mengonfigurasi parameter table.exec.skew-join.replicate-num untuk menentukan jumlah bucket yang berisi data miring. Nilai default parameter ini adalah 16 dan tidak boleh melebihi jumlah thread konkuren pada operator join tabel dimensi. Untuk informasi selengkapnya tentang cara mengonfigurasi parameter ini, lihat Bagaimana cara mengonfigurasi parameter running kustom untuk pekerjaan?

    • Aliran pembaruan tidak didukung. Jika aliran utama adalah aliran pembaruan dan Anda menggunakan strategi join REPLICATED_SHUFFLE_HASH, sistem akan mengembalikan error.

  • Kode contoh

    -- Aktifkan strategi join REPLICATED_SHUFFLE_HASH untuk tabel dimensi dim1.
    SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
    
    -- Gunakan alias untuk tabel dimensi dim1 dalam petunjuk untuk mengaktifkan strategi join REPLICATED_SHUFFLE_HASH.
    SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ 
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a

SKEW

  • Efek

    Jika tabel yang ditentukan mengalami kesenjangan data, pengoptimal menggunakan strategi join REPLICATED_SHUFFLE_HASH untuk operasi JOIN pada tabel dimensi. SKEW hanyalah syntactic sugar; strategi join REPLICATED_SHUFFLE_HASH yang sebenarnya digunakan di lapisan bawah.

  • Batasan

    • Anda hanya dapat menentukan satu tabel dalam setiap petunjuk SKEW.

    • Nama tabel harus merupakan nama tabel primary yang mengalami kesenjangan data, bukan tabel dimensi.

    • Aliran pembaruan tidak didukung. Jika aliran utama adalah aliran pembaruan dan Anda menggunakan strategi join SKEW, sistem akan mengembalikan error.

  • Kode contoh

    SELECT /*+ SKEW(src) */  
    FROM src AS T 
    LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
Penting
  • Strategi shuffle dari LOOKUP hint menyediakan kemampuan petunjuk SHUFFLE_HASH. Jika Anda menggunakan kedua jenis petunjuk tersebut, strategi shuffle dari LOOKUP hint akan memiliki prioritas dibandingkan petunjuk SHUFFLE_HASH.

  • Strategi shuffle dari LOOKUP hint tidak dapat mengatasi masalah kesenjangan data. Jika Anda menggunakan LOOKUP hint bersama dengan petunjuk REPLICATED_SHUFFLE_HASH atau SKEW, strategi shuffle dari petunjuk REPLICATED_SHUFFLE_HASH atau SKEW akan memiliki prioritas dibandingkan LOOKUP hint.

Contoh

  • Data uji

    • Tabel 1 kafka_input

      id (bigint)

      name (varchar)

      age (bigint)

      1

      lilei

      22

      2

      hanmeimei

      20

      3

      libai

      28

    • Tabel 2 phoneNumber

      name (varchar)

      phoneNumber (bigint)

      dufu

      1390000111

      baijuyi

      1390000222

      libai

      1390000333

      lilei

      1390000444

  • Pernyataan uji

    CREATE TEMPORARY TABLE kafka_input (
      id   BIGINT,
      name VARCHAR,
      age  BIGINT
    ) WITH (
      'connector' = 'kafka',
      'topic' = '<yourTopic>',
      'properties.bootstrap.servers' = '<yourKafkaBrokers>',
      'properties.group.id' = '<yourKafkaConsumerGroupId>',
      'format' = 'csv'
    );
    
    CREATE TEMPORARY TABLE phoneNumber(
      name VARCHAR,
      phoneNumber BIGINT,
      PRIMARY KEY(name) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '<yourHostname>',
      'port' = '3306',
      'username' = '<yourUsername>',
      'password' = '<yourPassword>',
      'database-name' = '<yourDatabaseName>',
      'table-name' = '<yourTableName>'
    );
    
    CREATE TEMPORARY TABLE result_infor(
      id BIGINT,
      phoneNumber BIGINT,
      name VARCHAR
    ) WITH (
      'connector' = 'blackhole'
    );
    
    INSERT INTO result_infor
    SELECT
      t.id,
      w.phoneNumber,
      t.name
    FROM kafka_input as t
    JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w
    ON t.name = w.name;
  • Hasil uji

    id (bigint)

    phoneNumber (bigint)

    name (varchar)

    1

    1390000444

    lilei

    3

    1390000333

    libai