Topik ini menjelaskan cara meningkatkan kinerja penyebaran SQL Realtime Compute for Apache Flink dengan mengoptimalkan konfigurasi penyebaran dan logika SQL Flink.
Optimasi konfigurasi penyebaran
Optimalkan Konfigurasi Sumber Daya
Ververica Platform (VVP) memberlakukan batasan pada jumlah core CPU yang dapat digunakan di JobManager dan TaskManager. Jumlah maksimum core CPU yang dapat digunakan sama dengan jumlah core CPU yang dikonfigurasikan. Saat mengoptimalkan konfigurasi sumber daya, kami menyarankan Anda melakukan operasi berikut:
Jika sejumlah besar penyebaran berjalan secara paralel, Anda dapat mengonfigurasi parameter untuk meningkatkan jumlah core CPU dan ukuran memori yang digunakan oleh JobManager di bagian Resources tab Configuration. Contoh:
Setel Job Manager CPU ke 4.
Setel Job Manager Memory ke 8 GiB.
Jika topologi penyebaran kompleks, Anda dapat mengonfigurasi parameter untuk meningkatkan jumlah core CPU dan ukuran memori yang digunakan oleh TaskManager di bagian Resources tab Configuration. Contoh:
Setel Task Manager CPU ke 2.
Setel Task Manager Memory ke 4 GiB.
Kami menyarankan Anda mempertahankan nilai default taskmanager.numberOfTaskSlots. Nilai defaultnya adalah 1.
Tingkatkan Throughput dan Atasi Masalah Hotspot Data
Tambahkan kode berikut ke bidang Other Configuration di bagian Parameters tab Configuration. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi parameter kustom untuk penyebaran yang sedang berjalan? dan Optimalkan Agregat Grup.
execution.checkpointing.interval: 180s table.exec.state.ttl: 129600000 table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s table.optimizer.distinct-agg.split.enabled: trueTabel berikut menjelaskan parameter tersebut.
Parameter
Deskripsi
execution.checkpointing.interval
Interval checkpoint, dalam milidetik.
state.backend
Konfigurasi backend status.
table.exec.state.ttl
Siklus hidup data status, dalam milidetik.
table.exec.mini-batch.enabled
Menentukan apakah akan mengaktifkan fitur miniBatch.
table.exec.mini-batch.allow-latency
Interval waktu ekspor data sekali.
table.exec.mini-batch.size
Jumlah maksimum catatan data yang dapat disimpan untuk operasi mikro-batch.
CatatanFitur miniBatch dioptimalkan di Realtime Compute for Apache Flink yang menggunakan Ververica Runtime (VVR). Kami menyarankan Anda tidak mengonfigurasi parameter ini. Untuk informasi lebih lanjut, lihat Parameter utama.
table.optimizer.distinct-agg.split.enabled
Menentukan apakah akan mengaktifkan optimasi PartialFinal untuk menyelesaikan masalah hotspot data saat menggunakan fungsi COUNT DISTINCT.
Tingkatkan Kinerja Penyebaran di Mana Operasi JOIN untuk Dua Aliran Data Dilakukan
Operator JOIN yang digunakan untuk menggabungkan dua aliran data dalam penyebaran streaming SQL memungkinkan mesin Realtime Compute for Apache Flink secara otomatis menyimpulkan apakah akan mengaktifkan fitur pemisahan key-value. Di Realtime Compute for Apache Flink yang menggunakan VVR 6.0.1 atau lebih baru, mesin Realtime Compute for Apache Flink dapat secara otomatis menyimpulkan apakah akan mengaktifkan fitur pemisahan key-value berdasarkan karakteristik penyebaran streaming SQL. Tidak diperlukan konfigurasi tambahan. Setelah fitur pemisahan key-value diaktifkan, kinerja penyebaran di mana operasi JOIN untuk dua aliran data dilakukan meningkat secara signifikan. Hasil pengujian kinerja dalam skenario tipikal menunjukkan bahwa kinerja meningkat lebih dari 40%.
Anda dapat mengonfigurasi parameter table.exec.join.kv-separate untuk menentukan apakah akan mengaktifkan fitur pemisahan key-value. Nilai valid:
AUTO: Mesin Realtime Compute for Apache Flink secara otomatis mengaktifkan fitur pemisahan key-value berdasarkan status operator JOIN yang digunakan untuk menggabungkan dua aliran data. Ini adalah nilai default.
FORCE: Mesin Realtime Compute for Apache Flink secara paksa mengaktifkan fitur pemisahan key-value.
NONE: Mesin Realtime Compute for Apache Flink secara paksa menonaktifkan fitur pemisahan key-value.
CatatanFitur pemisahan key-value hanya berlaku pada GeminiStateBackend.
Praktik terbaik Flink SQL yang direkomendasikan
Optimalkan agregat grup
Aktifkan miniBatch untuk Meningkatkan Throughput
Jika miniBatch diaktifkan, Realtime Compute for Apache Flink memproses data ketika cache data memenuhi kondisi pemicu. Ini mengurangi jumlah akses Realtime Compute for Apache Flink ke data status. Ini meningkatkan throughput data dan mengurangi output data.
Fitur miniBatch memicu pemrosesan mikro-batch berdasarkan pesan acara. Pesan acara disisipkan di sumber pada interval tertentu.
Skenario
Pemrosesan mikro-batch mencapai throughput lebih tinggi dengan mengorbankan latensi lebih tinggi. Oleh karena itu, pemrosesan mikro-batch tidak berlaku untuk skenario yang memerlukan latensi sangat rendah. Namun, dalam skenario agregasi data, kami menyarankan Anda mengaktifkan pemrosesan mikro-batch untuk meningkatkan kinerja sistem.
Cara Mengaktifkan miniBatch
Fitur miniBatch dinonaktifkan secara default. Untuk mengaktifkan fitur ini, Anda harus menambahkan kode berikut ke bidang Other Configuration di bagian Parameters tab Configuration. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi parameter kustom untuk penyebaran yang sedang berjalan?
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5sTabel berikut menjelaskan parameter tersebut.
Parameter
Deskripsi
table.exec.mini-batch.enabled
Menentukan apakah akan mengaktifkan fitur miniBatch.
table.exec.mini-batch.allow-latency
Interval waktu ekspor data sekali.
table.exec.mini-batch.size
Jumlah maksimum catatan data yang dapat disimpan untuk operasi mikro-batch.
CatatanParameter ini hanya berlaku jika digunakan bersama dengan dua parameter sebelumnya. Fitur miniBatch dioptimalkan di Realtime Compute for Apache Flink yang menggunakan VVR. Kami menyarankan Anda tidak mengonfigurasi parameter ini. Untuk informasi lebih lanjut, lihat Parameter utama.
Aktifkan LocalGlobal untuk Menyelesaikan Masalah Hotspot Data Umum
Kebijakan LocalGlobal dapat menyaring beberapa data yang miring dengan menggunakan agregasi lokal dan menyelesaikan masalah hotspot data dalam agregasi global. Ini meningkatkan kinerja penyebaran.
Kebijakan LocalGlobal membagi proses agregasi menjadi dua fase: agregasi lokal dan agregasi global. Fase-fase ini setara dengan fase combine dan reduce dalam MapReduce. Dalam fase agregasi lokal, Realtime Compute for Apache Flink menggabungkan mikro-batch data yang disimpan secara lokal di setiap node hulu, dan menghasilkan nilai akumulator untuk setiap operasi mikro-batch. Dalam fase agregasi global, akumulator digabungkan ke hasil akhir. Kemudian, hasil agregasi global dihasilkan.
Skenario
Kebijakan LocalGlobal cocok untuk skenario di mana Anda ingin meningkatkan kinerja penyebaran dan menyelesaikan masalah hotspot data dengan menggunakan fungsi agregat umum, seperti SUM, COUNT, MAX, MIN, dan AVG.
Batasan
LocalGlobal diaktifkan secara default. Kebijakan ini memiliki batasan berikut:
Hanya dapat berlaku saat miniBatch diaktifkan.
AggregateFunction harus digunakan untuk menggabungkan data.
Verifikasi
Untuk menentukan apakah LocalGlobal diaktifkan, periksa apakah node GlobalGroupAggregate atau LocalGroupAggregate ada dalam topologi akhir.
Aktifkan PartialFinal untuk Menyelesaikan Masalah Hotspot Data Saat Menggunakan Fungsi COUNT DISTINCT
Dalam kondisi normal, Anda perlu menambahkan lapisan yang menyebarkan data berdasarkan kunci distinct saat menggunakan fungsi COUNT DISTINCT. Dengan cara ini, Anda dapat membagi proses agregasi menjadi dua fase untuk menyelesaikan masalah hotspot data. Realtime Compute for Apache Flink sekarang menyediakan kebijakan PartialFinal untuk secara otomatis menyebarkan data dan membagi proses agregasi.
Kebijakan LocalGlobal meningkatkan kinerja fungsi agregat umum, seperti SUM, COUNT, MAX, MIN, dan AVG. Namun, kebijakan LocalGlobal kurang efektif untuk meningkatkan kinerja fungsi COUNT DISTINCT. Hal ini karena agregasi lokal tidak dapat menghapus kunci distinct duplikat. Akibatnya, sejumlah besar data menumpuk di fase agregasi global.
Skenario
Kebijakan PartialFinal cocok untuk skenario di mana kinerja agregasi tidak dapat memenuhi persyaratan Anda saat menggunakan fungsi COUNT DISTINCT.
CatatanAnda tidak dapat mengaktifkan PartialFinal dalam kode Flink SQL yang berisi fungsi agregat yang ditentukan pengguna (UDAF).
Untuk mencegah pemborosan sumber daya, kami menyarankan Anda hanya mengaktifkan PartialFinal saat jumlah data besar. PartialFinal secara otomatis menyebarkan data dan membagi proses agregasi menjadi dua fase. Ini menyebabkan shuffling jaringan tambahan.
Cara Mengaktifkan PartialFinal
PartialFinal dinonaktifkan secara default. Untuk mengaktifkan fitur ini, Anda harus menambahkan kode berikut ke bidang Other Configuration di bagian Parameters tab Configuration. Untuk informasi lebih lanjut, lihat Bagaimana cara mengonfigurasi parameter kustom untuk penyebaran yang sedang berjalan?
table.optimizer.distinct-agg.split.enabled: trueVerifikasi
Periksa apakah agregasi satu lapisan berubah menjadi agregasi dua lapisan dalam topologi akhir.
AGG WITH CASE WHEN diubah menjadi AGG WITH FILTER untuk Meningkatkan Kinerja Sistem dalam Skenario di Mana Fungsi COUNT DISTINCT Digunakan Saat Sejumlah Besar Data Ada
Penyebaran statistik mencatat pengunjung unik (UV) dalam dimensi berbeda, seperti UV semua saluran, UV terminal seluler, dan UV PC. Kami menyarankan Anda menggunakan sintaksis standar AGG WITH FILTER daripada sintaksis AGG WITH CASE WHEN untuk menerapkan analisis statistik multidimensi. Alasannya adalah optimizer SQL Realtime Compute for Apache Flink dapat menganalisis parameter filter. Dengan cara ini, Realtime Compute for Apache Flink dapat menjalankan fungsi COUNT DISTINCT pada bidang yang sama dalam kondisi filter berbeda dengan berbagi data status. Ini mengurangi operasi baca dan tulis pada data status. Dalam pengujian kinerja, sintaksis AGG WITH FILTER lebih unggul daripada sintaksis AGG WITH CASE WHEN. Hal ini karena kinerja penyebaran untuk sintaksis AGG WITH FILTER dua kali lipat dibandingkan dengan sintaksis AGG WITH CASE WHEN.
Skenario
Jika Anda menggunakan AGG WITH FILTER daripada AGG WITH CASE WHEN saat menggunakan fungsi COUNT DISTINCT untuk menghitung hasil di bawah kondisi berbeda pada bidang yang sama, kinerja penyebaran meningkat secara signifikan.
Pernyataan Asli
COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2Pernyataan Dioptimalkan
COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2
Teknik optimasi Join
Mengaktifkan MiniBatch untuk meningkatkan throughput
MiniBatch menyimpan data dalam cache sebelum memicu pemrosesan. Fitur ini berusaha menggabungkan pesan dalam satu batch untuk mengurangi jumlah data yang diproses dan jumlah akses ke status. Hal ini meningkatkan throughput, mengurangi keluaran antara, dan meringankan beban pada operator hilir. MiniBatch memicu pemrosesan mikro-batch ketika salah satu kondisi berikut terpenuhi: latensi maksimum yang dikonfigurasi tercapai, ukuran batch cache tercapai, atau event lain seperti checkpoint terjadi.
Kasus penggunaan ideal
MiniBatch menukar sebagian latensi demi throughput yang lebih tinggi. Jangan aktifkan fitur ini jika Anda memiliki persyaratan latensi ultra-rendah. Dalam skenario join umum, terutama yang melibatkan outer join bertingkat seperti pada contoh SQL berikut, mengaktifkan MiniBatch dapat secara signifikan meningkatkan kinerja sistem. Hal ini karena operator hulu menggunakan mekanisme collapse atau merge pesan untuk secara efektif menekan efek amplifikasi pesan pada operator hilir.
SELECT a.id as a_id, a.a_content, B.id as b_id, B.b_content FROM a LEFT JOIN (SELECT * FROM b LEFT JOIN c on b.prd_id = c.id) B ON a.id = B.id
Cara mengaktifkan MiniBatch
Optimasi ini didukung di VVR 8.0.4 dan versi lebih baru. MiniBatch dinonaktifkan secara default. Untuk mengaktifkan MiniBatch, tambahkan kode berikut ke bagian Parameters pada halaman detail penerapan Anda di tab Configuration. Untuk informasi selengkapnya, lihat Bagaimana cara mengonfigurasi parameter kustom untuk penerapan yang sedang berjalan?.
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5s table.exec.stream.join.mini-batch-enabled: true
Praktik TopN
Algoritma TopN
Jika aliran data input TopN adalah aliran data statis, seperti aliran data dari sumber data Layanan Log, TopN hanya mendukung algoritma AppendRank. Jika aliran data input TopN adalah aliran data dinamis, seperti aliran data yang diproses oleh fungsi agregat atau join, TopN mendukung algoritma UpdateFastRank dan RetractRank. Kinerja UpdateFastRank lebih baik daripada RetractRank. Nama algoritma yang digunakan termasuk dalam nama node dalam topologi.
AppendRank: Hanya algoritma ini yang didukung untuk aliran data statis.
UpdateFastRank: Algoritma ini optimal untuk aliran data dinamis.
RetractRank: Algoritma ini adalah algoritma dasar untuk aliran data dinamis. Jika kinerja algoritma ini tidak memenuhi persyaratan bisnis Anda, Anda dapat mengubah algoritma ini menjadi UpdateFastRank dalam skenario tertentu.
Berikut ini menjelaskan cara mengubah RetractRank menjadi UpdateFastRank. Jika Anda ingin menggunakan algoritma UpdateFastRank, pastikan kondisi berikut terpenuhi:
Aliran data masukan bersifat dinamis tetapi tidak mengandung pesan DELETE atau UPDATE_BEFORE. Jika aliran data masukan mengandung pesan DELETE atau UPDATE_BEFORE, monotonisitas bidang pengurutan terganggu. Anda dapat mengeksekusi pernyataan
EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set>untuk mengetahui jenis pesan dalam aliran data masukan node terkait. Untuk informasi selengkapnya tentang sintaks pernyataan EXPLAIN, lihat EXPLAIN.Aliran data input berisi informasi kunci utama. Misalnya, klausa GROUP BY digunakan untuk menggabungkan kolom berdasarkan kunci utama di sumber.
Nilai bidang atau fungsi, seperti ORDER BY COUNT, COUNT_DISTINCT, atau SUM (nilai positif) DESC dalam klausa ORDER BY, diperbarui secara monotonik dalam urutan pengurutan yang berlawanan.
Jika Anda menggunakan ORDER BY SUM DESC untuk mendapatkan rencana optimasi UpdateFastRank, Anda harus menentukan kondisi untuk mendapatkan nilai SUM positif. Ini memastikan bahwa nilai bidang total_fee positif.
CatatanDalam kode sampel berikut, tabel random_test berisi aliran data statis. Hasil agregasi grup terkait tidak berisi pesan DELETE atau UPDATE_BEFORE. Oleh karena itu, monotonisitas dipertahankan untuk bidang hasil agregasi terkait.
Kode sampel mengubah RetractRank menjadi UpdateFastRank:
insert into print_test SELECT cate_id, seller_id, stat_date, pay_ord_amt-- Keluaran perintah tidak berisi bidang rownum. Ini dapat mengurangi keluaran data dalam tabel hasil. FROM ( SELECT *, ROW_NUMBER () OVER ( -- Catatan: Kolom PARTITION BY harus disertakan dalam klausa GROUP BY dalam subquery. Bidang waktu juga harus disertakan. Jika tidak, data menjadi tidak teratur saat status kedaluwarsa. PARTITION BY cate_id, stat_date ORDER BY pay_ord_amt DESC ) as rownum -- Urutkan data berdasarkan jumlah input data. FROM ( SELECT cate_id, seller_id, stat_date, -- Catatan: Hasil fungsi SUM meningkat secara monotonik karena nilai dalam fungsi SUM positif. Oleh karena itu, Anda dapat menggunakan algoritma UpdateFast dari TopN untuk mendapatkan 100 catatan data teratas. sum (total_fee) filter ( where total_fee >= 0 ) as pay_ord_amt FROM random_test WHERE total_fee >= 0 GROUP BY seller_id, stat_date, cate_id ) a ) WHERE rownum <= 100;Metode Optimasi TopN
Lakukan Optimasi Tanpa Peringkat
Jangan sertakan rownum dalam keluaran TopN. Kami menyarankan Anda mengurutkan hasil saat ditampilkan di frontend. Ini secara signifikan mengurangi jumlah data yang akan ditulis ke tabel hasil. Untuk informasi lebih lanjut tentang metode optimasi tanpa peringkat, lihat Top-N.
Tingkatkan Ukuran Cache TopN
TopN menyediakan cache status untuk meningkatkan efisiensi akses ke data status. Rumus berikut digunakan untuk menghitung rasio hit cache TopN:
cache_hit = cache_size*parallelism/top_n/partition_key_numSebagai contoh, Top100 digunakan, cache berisi 10.000 catatan, dan parallelism adalah 50. Jika jumlah kunci untuk fungsi PARTITION BY adalah 100.000, rasio hit cache adalah 5%. Rasio ini dihitung menggunakan rumus: 10000 × 50/100/100,000 = 5%. Rasio hit cache rendah, yang menunjukkan bahwa sejumlah besar permintaan mengakses data status disk dan nilai metrik state seek mungkin tidak stabil. Dalam hal ini, kinerja berkurang secara signifikan.
Oleh karena itu, jika jumlah kunci untuk fungsi PARTITION BY besar, Anda dapat meningkatkan ukuran cache dan memori heap TopN. Untuk informasi lebih lanjut, lihat Konfigurasikan Penyebaran.
table.exec.rank.topn-cache-size: 200000Dalamcontoh ini, jika Anda meningkatkan ukuran cache TopN dari nilai default 10.000 menjadi 200.000, rasio hit cache mungkin mencapai 100%. Rasio hit cache ini dihitung menggunakan rumus berikut: 200,000 × 50/100/100,000 = 100%.
Sertakan Bidang Waktu dalam Fungsi PARTITION BY
Sebagai contoh, tambahkan bidang Hari ke peringkat setiap hari. Jika tidak, hasil TopN menjadi tidak teratur karena time-to-live (TTL) data status.
Deduplikasi efisien
Aliran input Realtime Compute for Apache Flink mungkin berisi data duplikat, yang membuat deduplikasi efisien menjadi kebutuhan sering. Realtime Compute for Apache Flink menawarkan dua kebijakan untuk menghapus duplikat: Deduplicate Keep FirstRow dan Deduplicate Keep LastRow.
Sintaksis
Flink SQL tidak menyediakan sintaksis untuk menghapus duplikat. Untuk menyimpan catatan di baris pertama atau terakhir dari baris duplikat di bawah kunci utama yang ditentukan dan membuang duplikat lainnya, Anda harus menggunakan fungsi SQL ROW_NUMBER() bersama dengan klausa OVER. Deduplikasi adalah fungsi TopN khusus.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY col1[, col2..] ORDER BY timeAttributeCol [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1Parameter
Deskripsi
ROW_NUMBER()
Menghitung nomor baris. Ini adalah fungsi jendela yang digunakan dengan klausa OVER. Nilai dimulai dari 1.
PARTITION BY col1[, col2..]
Opsional. Menentukan kolom partisi untuk menyimpan kunci utama duplikat.
ORDER BY timeAttributeCol [asc|desc])
Menentukan kolom berdasarkan mana Anda ingin mengurutkan data. Anda harus menentukan atributwaktu, yang bisa berupa proctime atau rowtime. Anda dapat mengurutkan baris dalam urutan menaik atau menurun berdasarkan atribut waktu. Untuk urutan menaik, catatan di baris pertama dari baris duplikat disimpan. Untuk urutan menurun, catatan di baris terakhir dari baris duplikat disimpan.
rownum
Menentukan jumlah baris. Anda dapat mengonfigurasi
rownum = 1ataurownum <= 1.Sintaksis sebelumnya menunjukkan bahwa deduplikasi melibatkan dua tingkat kueri:
Gunakan fungsi jendela
ROW_NUMBER()untuk mengurutkan data berdasarkan atribut waktu tertentu dan memberi peringkat pada data tersebut.Jika atribut waktu adalah proctime, Realtime Compute for Apache Flink menghapus duplikat berdasarkan waktu pemrosesan catatan. Dalam hal ini, hasil peringkat mungkin bervariasi setiap kali.
Jika atribut waktu adalah rowtime, Realtime Compute for Apache Flink menghapus duplikat berdasarkan waktu penulisan catatan ke Realtime Compute for Apache Flink. Dalam hal ini, hasil peringkat tetap sama setiap kali.
Simpan hanya catatan di baris pertama di bawah kunci utama yang ditentukan dan hapus duplikat lainnya.
Anda dapat mengurutkan catatan dalam urutan menaik atau menurun berdasarkan atribut waktu.
Deduplicate Keep FirstRow: Realtime Compute for Apache Flink mengurutkan catatan dalam baris secara menaik berdasarkan atribut waktu dan menyimpan catatan di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan.
Deduplicate Keep LastRow: Realtime Compute for Apache Flink mengurutkan catatan dalam baris secara menurun berdasarkan atribut waktu dan menyimpan catatan di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan.
Deduplicate Keep FirstRow
Jika Anda memilih kebijakan Deduplicate Keep FirstRow, Realtime Compute for Apache Flink menyimpan catatan di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan dan membuang duplikat lainnya. Dalam hal ini, data status hanya menyimpan informasi kunci utama, dan efisiensi akses ke data status meningkat secara signifikan. Contoh berikut digunakan untuk menjelaskan kebijakan tersebut.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum FROM T ) WHERE rowNum = 1Kode sebelumnya menghapus duplikat dari Tabel T berdasarkan bidang b, dan menyimpan catatan di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan berdasarkan waktu sistem. Dalam contoh ini, atribut proctime menunjukkan waktu sistem saat catatan diproses di Realtime Compute for Apache Flink. Realtime Compute for Apache Flink mengurutkan catatan dalam tabel T berdasarkan atribut ini. Untuk menghapus duplikat berdasarkan waktu sistem, Anda juga dapat memanggil fungsi PROCTIME() alih-alih mendeklarasikan atribut proctime.
Deduplicate Keep LastRow
Jika Anda memilih kebijakan Deduplicate Keep LastRow, Realtime Compute for Apache Flink menyimpan catatan di baris terakhir dari baris duplikat di bawah kunci utama yang ditentukan dan membuang duplikat lainnya. Kebijakan ini sedikit lebih unggul daripada fungsi LAST_VALUE dalam hal kinerja. Contoh berikut digunakan untuk menjelaskan kebijakan tersebut.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum FROM T ) WHERE rowNum = 1Kode sebelumnya menghapus duplikat dari tabel T berdasarkan bidang b dan d, dan menyimpan catatan di baris terakhir di bawah kunci utama yang ditentukan berdasarkan waktu penulisan catatan ke Realtime Compute for Apache Flink. Dalam contoh ini, atribut rowtime menunjukkan waktu acara saat catatan ditulis ke Realtime Compute for Apache Flink. Realtime Compute for Apache Flink mengurutkan catatan dalam tabel T berdasarkan atribut ini.
Fungsi bawaan yang efisien
Jika Anda menggunakan fungsi bawaan, perhatikan poin-poin berikut:
Gunakan Fungsi Bawaan untuk Menggantikan Fungsi yang Ditentukan Pengguna (UDF)
Fungsi bawaan Realtime Compute for Apache Flink terus dioptimalkan. Kami menyarankan Anda mengganti UDF dengan fungsi bawaan jika memungkinkan. Realtime Compute for Apache Flink melakukan operasi berikut untuk mengoptimalkan fungsi bawaan:
Meningkatkan efisiensi serialisasi dan deserialisasi.
Mengizinkan operasi pada data dalam byte.
Gunakan Pemisah Karakter Tunggal dalam Fungsi KEYVALUE
Tanda tangan fungsi KEYVALUE adalah
KEYVALUE(content, keyValueSplit, keySplit, keyName). Jika keyValueSplit dan keySplit adalah pemisah karakter tunggal, seperti titik dua (:) atau koma (,), Realtime Compute for Apache Flink menggunakan algoritma yang dioptimalkan. Realtime Compute for Apache Flink mencari nilai keyName yang diperlukan dalam data biner tanpa membagi seluruh konten, sehingga meningkatkan kinerja penerapan sekitar 30%.Perhatikan Poin-Poin Berikut Saat Menggunakan Operator LIKE:
Untuk mencocokkan catatan yang dimulai dengan konten tertentu, gunakan
LIKE 'xxx%'.Untuk mencocokkan catatan yang diakhiri dengan konten tertentu, gunakan
LIKE '%xxx'.Untuk mencocokkan catatan yang berisi konten tertentu, gunakan
LIKE '%xxx%'.Untuk mencocokkan catatan yang sama persis dengan konten tertentu, gunakan
LIKE 'xxx', yang setara denganstr = 'xxx'.Untuk mencocokkan garis bawah (_), gunakan
LIKE '%seller/_id%' ESCAPE '/'. Garis bawah (_) di-escape karena merupakan wildcard karakter tunggal dalam SQL dan dapat mencocokkan semua karakter. Jika Anda menggunakanLIKE '%seller_id%', sejumlah besar hasil dikembalikan, sepertiseller_id,iseller#id,sellerxid, danseller1id, yang dapat menyebabkan hasil yang tidak diinginkan.
Hindari Menggunakan Ekspresi Reguler
Pemrosesan data menggunakan ekspresi reguler memakan waktu, yang dapat menyebabkan overhead kinerja ratusan kali lebih tinggi daripada operasi penambahan, pengurangan, perkalian, atau pembagian. Selain itu, ekspresi reguler mungkin masuk ke loop tak terbatas dalam beberapa kasus ekstrem. Akibatnya, penyebaran mungkin terblokir. Untuk mencegah masalah pemblokiran penyebaran, kami menyarankan Anda menggunakan operator LIKE. Untuk informasi lebih lanjut tentang ekspresi reguler umum, klik tautan berikut:
Petunjuk SQL
Flink mendukung Petunjuk SQL untuk meningkatkan kemampuan optimasi mesin. Petunjuk SQL cocok untuk skenario berikut:
Modifikasi rencana eksekusi: Anda dapat menggunakan petunjuk SQL untuk lebih baik mengelola rencana eksekusi.
Penambahan metadata atau data statistik: Data statistik tertentu, seperti indeks tabel yang dipindai dan informasi skew tentang kunci shuffle tertentu, bersifat dinamis untuk kueri. Metadata perencanaan yang diperoleh dari perencana mungkin tidak akurat. Anda dapat menggunakan petunjuk SQL untuk mengonfigurasi metadata atau data statistik.
Konfigurasi Opsi Tabel Dinamis: Opsi tabel dinamis memungkinkan Anda secara dinamis menentukan atau menimpa opsi tabel. Opsi ini dapat ditentukan dalam cakupan per-tabel dalam setiap kueri.
Petunjuk kueri adalah jenis petunjuk SQL yang digunakan untuk menyarankan optimizer untuk memodifikasi rencana eksekusi. Modifikasi hanya berlaku dalam blok query block tempat petunjuk kueri saat ini berada. Petunjuk kueri Flink hanya mendukung petunjuk join.
Sintaksis
Sintaksis petunjuk kueri Flink sama dengan sintaksis SQL Apache Calcite.
# Petunjuk kueri: SELECT /*+ hint [, hint ] */ ... hint: hintName '(' hintOption [, hintOption ]* ')' hintOption: simpleIdentifier | numericLiteral | stringLiteralPetunjuk Join
Petunjuk join adalah jenis petunjuk kueri yang memungkinkan Anda mengoptimalkan join secara dinamis. Petunjuk join untuk tabel dimensi dan petunjuk untuk join reguler didukung.