Dalam Realtime Compute for Apache Flink, setiap aliran data dapat dihubungkan dengan tabel dimensi dari sumber data eksternal. Hal ini memungkinkan Anda melakukan kueri terkait dalam Realtime Compute for Apache Flink.
Informasi latar belakang
Sebagian besar konektor memungkinkan Anda menentukan kebijakan cache untuk operasi JOIN pada tabel dimensi. Konektor yang berbeda mendukung kebijakan cache yang berbeda. Untuk informasi lebih lanjut, lihat dokumentasi konektor terkait. Berikut adalah kebijakan cache yang didukung:
Tidak ada: Data tidak di-cache. Ini adalah nilai default.
LRU: Hanya data tertentu dalam tabel dimensi yang di-cache. Setiap kali sistem menerima catatan data, sistem mencari di cache. Jika sistem tidak menemukan catatan di cache, sistem akan mencari catatan data di tabel dimensi fisik.
SEMUA: Semua data dalam tabel dimensi di-cache. Sebelum penyebaran berjalan, sistem memuat semua data dalam tabel dimensi ke cache. Dengan cara ini, cache dicari untuk semua kueri berikutnya dalam tabel dimensi. Jika data yang memenuhi persyaratan tidak dapat ditemukan di cache, kunci tersebut tidak ada. Sistem memuat ulang semua data di cache setelah entri cache kedaluwarsa. Jika jumlah data dalam tabel jarak jauh kecil dan banyak kunci yang hilang, kami sarankan Anda mengatur parameter ini ke SEMUA. Tabel sumber dan tabel dimensi tidak dapat dihubungkan berdasarkan klausa ON.
Anda perlu mempertimbangkan keseimbangan antara performa real-time dan performa pemrosesan data berdasarkan kebutuhan bisnis Anda. Jika Anda ingin data diperbarui secara real-time, Anda dapat mengizinkan konektor membaca data langsung dari tabel dimensi tanpa menggunakan data yang di-cache.
Jika Anda ingin menggunakan kebijakan cache, Anda dapat mengatur kebijakan cache ke LRU dan menentukan waktu hidup (TTL) untuk menyimpan data terbaru. Anda dapat mengatur TTL ke nilai kecil, seperti beberapa detik hingga puluhan detik. Dengan cara ini, data dapat dimuat dari tabel sumber pada interval yang ditentukan.
Jika kebijakan cache adalah SEMUA, Anda harus memantau penggunaan memori operator untuk mencegah kesalahan kehabisan memori (OOM).
Jika kebijakan cache adalah SEMUA, Anda harus meningkatkan memori operator untuk bergabung dengan tabel karena sistem secara asinkron memuat data dari tabel dimensi. Ukuran memori tambahan adalah dua kali lipat dari tabel jarak jauh.
Batasan
Anda hanya dapat menghubungkan aliran data dengan snapshot tabel dimensi yang diambil pada saat ini.
Tabel dimensi mendukung operasi INNER JOIN dan LEFT JOIN, tetapi tidak mendukung operasi RIGHT JOIN atau FULL JOIN.
Peringatan
Jika Anda ingin melakukan penggabungan tabel satu lawan satu, pastikan kondisi penggabungan berisi ekuivalen join yang mencakup bidang unik dalam tabel dimensi.
Setiap aliran data hanya terhubung dengan data terbaru dalam tabel dimensi pada waktu saat ini. Ini berarti bahwa operasi JOIN hanya dilakukan pada waktu pemrosesan. Oleh karena itu, jika data dalam tabel dimensi ditambahkan, diperbarui, atau dihapus setelah operasi JOIN dilakukan, data terkait tetap tidak berubah. Untuk informasi lebih lanjut tentang perilaku tabel dimensi tertentu, lihat Konektor yang Didukung.
Sintaksis
SELECT nama-kolom
FROM tabel1 [AS <alias1>]
[LEFT] JOIN tabel2 FOR SYSTEM_TIME AS OF PROCTIME() [AS <alias2>]
ON tabel1.nama-kolom1 = tabel2.nama-kunci1;Anda harus menambahkan FOR SYSTEM_TIME AS OF PROCTIME() di akhir tabel dimensi. Dengan cara ini, setiap catatan data dalam tabel dimensi yang dapat dilihat pada waktu saat ini dihubungkan dengan data sumber.
Kondisi ON harus berisi kondisi ekuivalen untuk bidang yang dapat dicari secara acak dalam tabel dimensi.
Dalam kondisi penggabungan yang ditentukan dalam klausa ON, bidang dalam tabel dimensi tidak dapat menggunakan fungsi konversi tipe, seperti CAST. Jika Anda ingin mengonversi tipe data, lakukan konversi pada bidang dalam tabel sumber.
Petunjuk gabungan untuk tabel dimensi
Anda dapat menggunakan petunjuk gabungan untuk tabel dimensi untuk menentukan strategi gabungan. Untuk informasi lebih lanjut tentang fitur petunjuk, lihat Petunjuk SQL untuk Flink. Petunjuk gabungan untuk tabel dimensi termasuk petunjuk LOOKUP dan petunjuk gabungan lainnya.
Hanya Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR) 8.0 atau lebih baru yang mendukung petunjuk LOOKUP.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau lebih baru yang memungkinkan Anda mengonfigurasi strategi shuffle menggunakan petunjuk LOOKUP.
Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0 atau lebih baru, alias dapat ditentukan dalam petunjuk gabungan untuk tabel dimensi. Jika alias ditentukan untuk tabel dimensi, alias tabel dimensi harus digunakan dalam petunjuk gabungan.
Hanya Realtime Compute for Apache Flink yang menggunakan VVR 4.0 atau lebih baru yang mendukung petunjuk gabungan lainnya.
Petunjuk LOOKUP
Fitur petunjuk LOOKUP dari Realtime Compute for Apache Flink konsisten dengan fitur petunjuk LOOKUP yang disediakan oleh komunitas open source. Anda dapat mengonfigurasi strategi pencarian sinkron, asinkron, dan ulang untuk tabel dimensi. Untuk informasi lebih lanjut, lihat Petunjuk LOOKUP. Dalam Realtime Compute for Apache Flink yang menggunakan VVR 8.0.8 atau lebih baru, fitur petunjuk LOOKUP diperluas untuk memungkinkan Anda mengonfigurasi 'shuffle' = 'true'. Dengan cara ini, Anda dapat menentukan strategi shuffle untuk operasi JOIN pada tabel dimensi. Tabel berikut menjelaskan strategi shuffle dalam skenario yang berbeda.
Skenario | Strategi shuffle untuk operasi JOIN |
'shuffle' = 'true' tidak dikonfigurasi. | Strategi shuffle default mesin digunakan. |
'shuffle' = 'true' tidak dikonfigurasi dan konektor tabel dimensi tidak menyediakan strategi shuffle kustom untuk operasi JOIN. | |
'shuffle' = 'true' dikonfigurasi dan konektor tabel dimensi tidak menyediakan strategi shuffle kustom untuk operasi JOIN. | Secara default, strategi SHUFFLE_HASH digunakan. Untuk informasi lebih lanjut, lihat SHUFFLE_HASH. |
'shuffle' = 'true' dikonfigurasi dan konektor tabel dimensi menyediakan strategi shuffle kustom untuk operasi JOIN. | Strategi shuffle kustom konektor tabel dimensi digunakan. |
Hanya Konektor Paimon yang menyediakan strategi shuffle kustom. Jika kolom penggabungan tabel dimensi mencakup semua bidang bucket, tabel dimensi di-shuffle berdasarkan bucket.
Kode sampel berikut memberikan contoh tentang cara mengonfigurasi strategi shuffle saat Anda melakukan operasi JOIN pada tabel dimensi:
-- Konfigurasikan strategi shuffle hanya untuk tabel dimensi dim1 tempat Anda melakukan operasi JOIN.
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- Konfigurasikan strategi shuffle untuk tabel dimensi dim1 dan dim2 tempat Anda melakukan operasi JOIN.
SELECT /*+ LOOKUP('table'='dim1', 'shuffle' = 'true'),LOOKUP('table'='dim2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b
-- Gunakan alias D1 untuk tabel dimensi dim1 dalam petunjuk untuk mengonfigurasi strategi shuffle untuk operasi JOIN.
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
-- Gunakan alias untuk tabel dimensi dim1 dan dim2 dalam petunjuk untuk mengonfigurasi strategi shuffle untuk operasi JOIN.
SELECT /*+ LOOKUP('table'='D1', 'shuffle' = 'true'),LOOKUP('table'='D2', 'shuffle' = 'true') */
FROM src AS T
LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.bPetunjuk gabungan lainnya
Petunjuk gabungan lainnya untuk tabel dimensi hanya digunakan untuk mengonfigurasi strategi gabungan untuk tabel dimensi, termasuk strategi SHUFFLE_HASH, REPLICATED_SHUFFLE_HASH, dan SKEW. Tabel berikut menjelaskan skenario penggunaan strategi gabungan berdasarkan konfigurasi kebijakan cache untuk tabel dimensi.
Kebijakan Cache | SHUFFLE_HASH | REPLICATED_SHUFFLE_HASH (Setara dengan SKEW) |
Tidak ada | Kami sarankan Anda tidak menggunakan strategi gabungan ini. Jika strategi gabungan ini digunakan, data utama memperkenalkan overhead jaringan tambahan. | Kami sarankan Anda tidak menggunakan strategi gabungan ini. Jika strategi gabungan ini digunakan, data utama memperkenalkan overhead jaringan tambahan. |
LRU | Jika I/O pencarian tabel dimensi menjadi hambatan, kami sarankan Anda menggunakan strategi gabungan ini. Jika data utama memiliki lokalitas temporal pada kunci gabungan, strategi gabungan ini dapat meningkatkan rasio hit cache dan mengurangi jumlah permintaan I/O. Ini meningkatkan throughput total. Penting Data utama memperkenalkan overhead jaringan tambahan. Jika data utama condong pada kunci gabungan dan ada hambatan kinerja, kami sarankan Anda menggunakan strategi gabungan REPLICATED_SHUFFLE_HASH. | Jika I/O pencarian tabel dimensi menjadi hambatan dan data utama condong pada kunci gabungan, kami sarankan Anda menggunakan strategi gabungan ini. Jika data utama memiliki lokalitas temporal pada kunci gabungan, strategi gabungan ini dapat meningkatkan rasio hit cache dan mengurangi jumlah permintaan I/O. Ini meningkatkan throughput total. |
SEMUA | Jika penggunaan memori tabel dimensi menjadi hambatan, kami sarankan Anda menggunakan strategi gabungan ini. Dengan cara ini, penggunaan memori dapat dikurangi menjadi nilai 1/Paralelisme. Penting Data utama memperkenalkan overhead jaringan tambahan. Jika data utama condong pada kunci gabungan dan ada hambatan kinerja, kami sarankan Anda menggunakan strategi gabungan REPLICATED_SHUFFLE_HASH. | Jika penggunaan memori tabel dimensi menjadi hambatan dan data utama condong pada kunci gabungan, kami sarankan Anda menggunakan strategi gabungan ini. Dengan cara ini, penggunaan memori dapat dikurangi menjadi nilai Jumlah bucket/Paralelisme. |
SHUFFLE_HASH
Efek
Strategi gabungan SHUFFLE_HASH memungkinkan data utama di-shuffle berdasarkan kunci gabungan sebelum operasi JOIN dilakukan. Jika kebijakan cache adalah LRU, rasio hit cache meningkat dan jumlah permintaan I/O berkurang. Jika kebijakan cache adalah SEMUA, penggunaan memori berkurang. Anda dapat menentukan beberapa tabel dimensi dalam setiap petunjuk gabungan SHUFFLE_HASH.
Batasan
Jika Anda menggunakan strategi gabungan SHUFFLE_HASH, overhead memori berkurang. Namun, overhead jaringan tambahan diperkenalkan karena data upstream perlu di-shuffle berdasarkan kunci gabungan. Oleh karena itu, strategi gabungan SHUFFLE_HASH tidak cocok untuk skenario berikut:
Data utama memiliki kemiringan data parah pada kunci gabungan. Jika Anda menggunakan strategi gabungan SHUFFLE_HASH untuk menggabungkan data, operator gabungan dapat menyebabkan hambatan kinerja karena kemiringan data. Ini dapat menyebabkan tekanan balik parah dalam penyebaran streaming atau ekor panjang parah dalam penyebaran batch. Dalam skenario ini, kami sarankan Anda menggunakan strategi gabungan REPLICATED_SHUFFLE_HASH.
Jika tabel dimensi berisi sejumlah kecil data dan tidak memiliki hambatan memori selama pemuatan tabel ketika kebijakan cache adalah SEMUA, penghematan overhead memori dengan menggunakan strategi gabungan SHUFFLE_HASH mungkin tidak sebanding dengan overhead jaringan tambahan yang diperkenalkan oleh menggunakan strategi gabungan SHUFFLE_HASH.
Kode Sampel
-- Aktifkan strategi gabungan SHUFFLE_HASH hanya untuk tabel dimensi dim1. SELECT /*+ SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- Aktifkan strategi gabungan SHUFFLE_HASH untuk tabel dimensi dim1 dan dim2. SELECT /*+ SHUFFLE_HASH(dim1, dim2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() ON T.a = dim1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() ON T.b = dim2.b -- Gunakan alias D1 untuk tabel dimensi dim1 dalam petunjuk untuk mengaktifkan strategi gabungan SHUFFLE_HASH. SELECT /*+ SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b -- Gunakan alias untuk tabel dimensi dim1 dan dim2 dalam petunjuk untuk mengaktifkan strategi gabungan SHUFFLE_HASH. SELECT /*+ SHUFFLE_HASH(D1, D2) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a LEFT JOIN dim2 FOR SYSTEM_TIME AS OF PROCTIME() AS D2 ON T.b = D2.b
REPLICATED_SHUFFLE_HASH
Efek
Efek REPLICATED_SHUFFLE_HASH pada dasarnya sama dengan efek SHUFFLE_HASH. Namun, REPLICATED_SHUFFLE_HASH secara acak menyebarkan data utama yang memiliki kunci yang sama ke sejumlah thread konkuren yang ditentukan untuk menyelesaikan hambatan kinerja yang disebabkan oleh kemiringan data. Anda dapat menentukan beberapa tabel dimensi dalam setiap petunjuk gabungan REPLICATED_SHUFFLE_HASH.
Batasan
Anda harus mengonfigurasi parameter
table.exec.skew-join.replicate-numuntuk menentukan jumlah bucket yang berisi data condong. Nilai default parameter ini adalah 16. Nilai parameter ini tidak boleh lebih besar dari jumlah thread konkuren pada operator gabungan tabel dimensi. Untuk informasi lebih lanjut tentang cara mengonfigurasi parameter ini, lihat Bagaimana cara mengonfigurasi parameter kustom untuk penyebaran yang sedang berjalan?Stream pembaruan tidak didukung. Jika stream utama adalah stream pembaruan dan Anda menggunakan strategi gabungan REPLICATED_SHUFFLE_HASH, kesalahan akan dikembalikan.
Kode Sampel
-- Aktifkan strategi gabungan REPLICATED_SHUFFLE_HASH untuk tabel dimensi dim1. SELECT /*+ REPLICATED_SHUFFLE_HASH(dim1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a -- Gunakan alias untuk tabel dimensi dim1 dalam petunjuk untuk mengaktifkan strategi gabungan REPLICATED_SHUFFLE_HASH. SELECT /*+ REPLICATED_SHUFFLE_HASH(D1) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
SKEW
Efek
Jika tabel yang ditentukan memiliki kemiringan data, pengoptimal menggunakan strategi gabungan REPLICATED_SHUFFLE_HASH untuk operasi JOIN pada tabel dimensi. SKEW hanya merupakan gula sintaksis dan strategi gabungan REPLICATED_SHUFFLE_HASH sebenarnya digunakan di lapisan bawah.
Batasan
Anda hanya dapat menentukan satu tabel dalam setiap petunjuk SKEW.
Nama tabel harus merupakan nama tabel utama yang memiliki kemiringan data, bukan tabel dimensi.
Stream pembaruan tidak didukung. Jika stream utama adalah stream pembaruan dan Anda menggunakan strategi gabungan SKEW, kesalahan akan dikembalikan.
Kode Sampel
SELECT /*+ SKEW(src) */ FROM src AS T LEFT JOIN dim1 FOR SYSTEM_TIME AS OF PROCTIME() AS D1 ON T.a = D1.a
Strategi shuffle petunjuk LOOKUP menyediakan kemampuan petunjuk SHUFFLE_HASH. Jika Anda menggunakan dua jenis petunjuk, strategi shuffle petunjuk LOOKUP memiliki prioritas lebih tinggi daripada petunjuk SHUFFLE_HASH.
Strategi shuffle petunjuk LOOKUP tidak dapat menyelesaikan masalah kemiringan data. Jika Anda menggunakan petunjuk LOOKUP bersama dengan petunjuk REPLICATED_SHUFFLE_HASH atau SKEW, strategi shuffle petunjuk REPLICATED_SHUFFLE_HASH atau SKEW memiliki prioritas lebih tinggi daripada petunjuk LOOKUP.
Contoh
Data Uji
Tabel 1 kafka_input
id (bigint)
nama (varchar)
umur (bigint)
1
lilei
22
2
hanmeimei
20
3
libai
28
Tabel 2: nomorTelepon
nama (varchar)
nomorTelepon (bigint)
dufu
1390000111
baijuyi
1390000222
libai
1390000333
lilei
1390000444
Pernyataan Uji
CREATE TEMPORARY TABLE kafka_input ( id BIGINT, nama VARCHAR, umur BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = '<topikAnda>', 'properties.bootstrap.servers' = '<brokerKafkaAnda>', 'properties.group.id' = '<groupIdKonsumenKafkaAnda>', 'format' = 'csv' ); CREATE TEMPORARY TABLE phoneNumber( nama VARCHAR, nomorTelepon BIGINT, PRIMARY KEY(nama) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '<hostnameAnda>', 'port' = '3306', 'username' = '<usernameAnda>', 'password' = '<kataSandiAnda>', 'database-name' = '<namaDatabaseAnda>', 'table-name' = '<namaTabelAnda>' ); CREATE TEMPORARY TABLE result_infor( id BIGINT, nomorTelepon BIGINT, nama VARCHAR ) WITH ( 'connector' = 'blackhole' ); INSERT INTO result_infor SELECT t.id, w.nomorTelepon, t.nama FROM kafka_input as t JOIN phoneNumber FOR SYSTEM_TIME AS OF PROCTIME() as w ON t.nama = w.nama;Hasil Uji
id (bigint)
nomorTelepon (bigint)
nama (varchar)
1
1390000444
lilei
3
1390000333
libai