Topik ini memberikan jawaban atas beberapa pertanyaan yang sering diajukan terkait kinerja penyebaran.
Bagaimana cara mengoptimalkan penyebaran yang menggunakan fungsi agregat dengan GROUP BY?
Bagaimana cara mengoptimalkan penyebaran dengan menggunakan praktik TopN?
Apa yang perlu saya fokuskan ketika menggunakan fungsi bawaan?
Apa arti warna berbeda untuk legenda di kolom Durasi Status pada tab Metrik Subtugas?
Apa yang harus saya lakukan jika konsumsi data upstream tidak stabil?
Bagaimana cara membagi operator dari sebuah penyebaran?
Pada halaman , klik nama penyebaran yang diinginkan. Pada tab Konfigurasi halaman deployment details, klik Edit di sudut kanan atas bagian Parameters, tambahkan kode berikut ke bidang Other Configuration, lalu klik Simpan untuk membuat konfigurasi berlaku.
pipeline.operator-chaining: 'false'Bagaimana cara mengoptimalkan penyebaran yang menggunakan fungsi agregat dengan GROUP BY?
Aktifkan miniBatch untuk meningkatkan throughput data
Jika miniBatch diaktifkan, Realtime Compute for Apache Flink memproses data ketika cache data memenuhi kondisi pemicu. Ini mengurangi jumlah akses yang dilakukan oleh Realtime Compute for Apache Flink ke data status, meningkatkan throughput data, dan mengurangi output data.
Fitur miniBatch memicu pemrosesan mikro-batch berdasarkan pesan acara. Pesan acara disisipkan di sumber pada interval tertentu.
Skenario
Pemrosesan mikro-batch mencapai throughput lebih tinggi dengan pengorbanan latensi lebih tinggi. Oleh karena itu, pemrosesan mikro-batch tidak berlaku untuk skenario yang memerlukan latensi sangat rendah. Namun, dalam skenario agregasi data, kami merekomendasikan Anda mengaktifkan pemrosesan mikro-batch untuk meningkatkan kinerja sistem.
Cara Mengaktifkan miniBatch
Fitur miniBatch dinonaktifkan secara default. Untuk mengaktifkan fitur ini, Anda harus memasukkan kode berikut di bidang Other Configuration bagian Parameters pada tab Configuration halaman detail penyebaran.
table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 5sTabel berikut menjelaskan parameter.
Parameter
Deskripsi
table.exec.mini-batch.enabled
Menentukan apakah akan mengaktifkan fitur miniBatch.
table.exec.mini-batch.allow-latency
Interval di mana data diekspor dalam batch.
Aktifkan LocalGlobal untuk menyelesaikan masalah hotspot data umum
Kebijakan LocalGlobal dapat menyaring beberapa data miring dengan menggunakan agregasi lokal. Ini secara efisien mengurangi masalah hotspot data dalam agregasi global dan meningkatkan kinerja penyebaran.
Kebijakan LocalGlobal membagi proses agregasi menjadi dua fase: agregasi lokal dan agregasi global. Fase-fase ini setara dengan fase combine dan reduce dalam MapReduce. Dalam fase agregasi lokal, Realtime Compute for Apache Flink menggabungkan mikro-batch data yang di-cache secara lokal di setiap node hulu, dan menghasilkan nilai akumulator untuk setiap mikro-batch. Dalam fase agregasi global, akumulator digabungkan ke hasil akhir. Kemudian, hasil agregasi global dihasilkan.
Skenario
Kebijakan LocalGlobal cocok untuk skenario di mana Anda ingin meningkatkan kinerja penyebaran dan menyelesaikan masalah hotspot data dengan menggunakan fungsi agregat umum, seperti SUM, COUNT, MAX, MIN, dan AVG.
Batasan
LocalGlobal diaktifkan secara default. Kebijakan ini memiliki batasan berikut:
LocalGlobal hanya dapat berfungsi jika miniBatch diaktifkan.
AggregateFunction harus digunakan untuk menggabungkan data.
Verifikasi
Untuk menentukan apakah LocalGlobal diaktifkan, periksa apakah node GlobalGroupAggregate atau LocalGroupAggregate ada dalam topologi akhir.
Aktifkan PartialFinal untuk menyelesaikan masalah hotspot data saat Anda menggunakan fungsi COUNT DISTINCT
Dalam kasus normal, Anda perlu menambahkan lapisan yang menyebarkan data berdasarkan kunci unik saat menggunakan fungsi COUNT DISTINCT. Dengan cara ini, Anda dapat membagi proses agregasi menjadi dua fase untuk menyelesaikan masalah hotspot data. Realtime Compute for Apache Flink sekarang menyediakan kebijakan PartialFinal untuk secara otomatis menyebarkan data dan membagi proses agregasi.
Kebijakan LocalGlobal meningkatkan kinerja fungsi agregat umum, seperti SUM, COUNT, MAX, MIN, dan AVG. Namun, kebijakan LocalGlobal kurang efektif dalam meningkatkan kinerja fungsi COUNT DISTINCT. Hal ini karena agregasi lokal tidak dapat menghapus kunci unik duplikat. Akibatnya, sejumlah besar data menumpuk di fase agregasi global.
Skenario
Kebijakan PartialFinal cocok untuk skenario di mana kinerja agregasi tidak dapat memenuhi persyaratan Anda saat menggunakan fungsi COUNT DISTINCT.
nullAnda tidak dapat mengaktifkan PartialFinal dalam kode Flink SQL yang berisi fungsi agregat yang didefinisikan pengguna (UDAF).
Untuk mencegah pemborosan sumber daya, kami merekomendasikan Anda mengaktifkan PartialFinal hanya saat jumlah data besar. PartialFinal secara otomatis menyebarkan data dan membagi proses agregasi menjadi dua fase. Ini menyebabkan pengacakan jaringan tambahan.
Cara Mengaktifkan PartialFinal
PartialFinal dinonaktifkan secara default. Untuk mengaktifkan fitur ini, Anda harus memasukkan kode berikut di bidang Other Configuration bagian Parameters pada tab Configuration halaman detail penyebaran.
table.optimizer.distinct-agg.split.enabled: trueVerifikasi
Periksa apakah agregasi satu-lapis berubah menjadi agregasi dua-lapis dalam topologi akhir.
AGG WITH CASE WHEN diubah menjadi AGG WITH FILTER untuk meningkatkan kinerja sistem dalam skenario di mana fungsi COUNT DISTINCT digunakan saat ada sejumlah besar data
Penyebaran statistik mencatat pengunjung unik (UV) dalam dimensi berbeda, seperti UV semua saluran, UV terminal seluler, dan UV PC. Kami merekomendasikan Anda menggunakan sintaksis standar AGG WITH FILTER alih-alih sintaksis AGG WITH CASE WHEN untuk menerapkan analisis statistik multidimensi. Alasannya adalah bahwa pengoptimal SQL dari Realtime Compute for Apache Flink dapat menganalisis parameter filter. Dengan cara ini, Realtime Compute for Apache Flink dapat menjalankan fungsi COUNT DISTINCT pada bidang yang sama dalam kondisi filter berbeda dengan berbagi data status. Ini mengurangi operasi baca dan tulis pada data status. Dalam pengujian kinerja, sintaksis AGG WITH FILTER lebih unggul daripada sintaksis AGG WITH CASE WHEN. Hal ini karena kinerja penyebaran untuk sintaksis AGG WITH FILTER dua kali lipat dibandingkan dengan sintaksis AGG WITH CASE WHEN.
Skenario
Jika Anda menggunakan AGG WITH FILTER alih-alih AGG WITH CASE WHEN saat menggunakan fungsi COUNT DISTINCT untuk menghitung hasil di bawah kondisi berbeda pada bidang yang sama, kinerja penyebaran meningkat secara signifikan.
Pernyataan Asli
COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2Pernyataan Dioptimalkan
COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2
Bagaimana cara mengoptimalkan penyebaran dengan menggunakan praktik TopN?
Algoritma TopN
Jika aliran data input dari TopN adalah aliran data statis, seperti aliran data dari sumber data Layanan Log, TopN hanya mendukung algoritma AppendRank. Jika aliran data input dari TopN adalah aliran data dinamis, seperti aliran data yang diproses oleh fungsi agregat atau join, TopN mendukung algoritma UpdateFastRank dan RetractRank. Kinerja UpdateFastRank lebih baik daripada RetractRank. Nama algoritma yang digunakan termasuk dalam nama node dalam topologi.
AppendRank: Hanya algoritma ini yang didukung untuk aliran data statis.
UpdateFastRank: Algoritma ini optimal untuk aliran data dinamis.
RetractRank: Algoritma ini adalah algoritma dasar untuk aliran data dinamis. Jika kinerja algoritma ini tidak memenuhi persyaratan bisnis Anda, Anda dapat mengubah algoritma ini menjadi UpdateFastRank dalam skenario tertentu.
Berikut ini menjelaskan cara mengubah RetractRank menjadi UpdateFastRank. Jika Anda ingin menggunakan algoritma UpdateFastRank, pastikan kondisi berikut terpenuhi:
Aliran data input dari TopN adalah aliran data dinamis.
Aliran data input berisi informasi kunci utama. Misalnya, klausa GROUP BY digunakan untuk menggabungkan kolom berdasarkan kunci utama di sumber.
Nilai bidang atau fungsi, seperti ORDER BY COUNT, COUNT_DISTINCT, atau SUM (nilai positif) DESC, diperbarui secara monotonik dalam urutan penyortiran yang berlawanan.
Jika Anda menggunakan ORDER BY SUM DESC untuk mendapatkan rencana optimasi UpdateFastRank, Anda harus menentukan kondisi untuk mendapatkan nilai SUM positif. Ini memastikan bahwa nilai total_fee positif.
insert into print_test SELECT cate_id, seller_id, stat_date, pay_ord_amt -- Bidang rownum tidak termasuk dalam data keluaran. Ini mengurangi jumlah data keluaran yang akan ditulis ke tabel hasil. FROM ( SELECT *, ROW_NUMBER () OVER ( PARTITION BY cate_id, stat_date -- Pastikan bidang stat_date termasuk. Jika tidak, data mungkin tidak sesuai urutan saat data status kedaluwarsa. ORDER BY pay_ord_amt DESC ) as rownum -- Data diurutkan berdasarkan jumlah input data. FROM ( SELECT cate_id, seller_id, stat_date, -- Catatan: Hasil fungsi SUM secara monotonik meningkat karena nilai yang dikembalikan oleh fungsi SUM positif. Oleh karena itu, TopN dapat menggunakan algoritma dioptimalkan untuk mendapatkan 100 data teratas. sum (total_fee) filter ( where total_fee >= 0 ) as pay_ord_amt FROM random_test WHERE total_fee >= 0 GROUP BY cate_name, seller_id, stat_date, cate_id ) a ) WHERE rownum <= 100;Metode Optimasi TopN
Lakukan Optimasi Tanpa Peringkat
Jangan sertakan rownum dalam output TopN. Kami merekomendasikan Anda mengurutkan hasil saat ditampilkan di frontend akhirnya. Ini secara signifikan mengurangi jumlah data yang akan ditulis ke tabel hasil. Untuk informasi lebih lanjut tentang metode optimasi tanpa peringkat, lihat Top-N.
Tingkatkan Ukuran Cache TopN
TopN menyediakan cache status untuk meningkatkan efisiensi akses ke data status. Rumus berikut digunakan untuk menghitung rasio hit cache TopN:
cache_hit = cache_size*parallelism/top_n/partition_key_numSebagai contoh, Top100 digunakan, cache berisi 10.000 rekaman, dan paralelisme adalah 50. Jika jumlah kunci untuk fungsi PARTITION BY adalah 100.000, rasio hit cache adalah 5%. Rasio ini dihitung dengan menggunakan rumus: 10000 × 50/100/100,000 = 5%. Rasio hit cache rendah, yang menunjukkan bahwa sejumlah besar permintaan mengakses data status disk dan nilai metrik pencarian status mungkin tidak stabil. Dalam hal ini, kinerja berkurang secara signifikan.
Oleh karena itu, jika jumlah kunci untuk fungsi PARTITION BY besar, Anda dapat meningkatkan ukuran cache dan memori heap TopN. Untuk informasi lebih lanjut, lihat Konfigurasikan Penyebaran.
table.exec.rank.topn-cache-size: 200000Dalam contoh ini, jika Anda meningkatkan ukuran cache TopN dari nilai default 10.000 menjadi 200.000, rasio hit cache mungkin mencapai 100%. Rasio hit cache ini dihitung dengan menggunakan rumus berikut:
200,000 × 50/100/100,000 = 100%.Sertakan Bidang Waktu dalam Fungsi PARTITION BY
Sebagai contoh, tambahkan bidang Hari ke peringkat setiap hari. Jika tidak, hasil TopN menjadi tidak sesuai urutan karena time-to-live (TTL) data status.
Bagaimana cara melakukan deduplikasi secara efisien?
Aliran input Realtime Compute for Apache Flink mungkin berisi data duplikat, yang membuat deduplikasi efisien menjadi kebutuhan sering. Realtime Compute for Apache Flink menawarkan dua kebijakan untuk menghapus duplikat: Deduplicate Keep FirstRow dan Deduplicate Keep LastRow.
Sintaksis
Flink SQL tidak menyediakan sintaksis untuk menghapus duplikat. Untuk menyimpan rekaman di baris pertama atau terakhir dari baris duplikat di bawah kunci utama yang ditentukan dan membuang duplikat lainnya, Anda harus menggunakan fungsi SQL ROW_NUMBER() bersama dengan klausa OVER. Deduplikasi adalah fungsi TopN khusus.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY col1[, col2..] ORDER BY timeAttributeCol [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1Parameter
Deskripsi
ROW_NUMBER()
Menghitung nomor baris. Ini adalah fungsi jendela yang digunakan dengan klausa OVER. Nilai dimulai dari 1.
PARTITION BY col1[, col2..]
Opsional. Menentukan kolom partisi, yang menyimpan kunci utama untuk deduplikasi.
ORDER BY timeAttributeCol [asc|desc])
Menentukan kolom berdasarkan mana Anda ingin mengurutkan data. Anda harus menentukan atribut waktu, yang bisa proctime atau rowtime. Anda dapat mengurutkan baris dalam urutan naik atau turun berdasarkan atribut waktu. Untuk urutan naik, rekaman di baris pertama dari baris duplikat disimpan. Untuk urutan turun, rekaman di baris terakhir dari baris duplikat disimpan.
rownum
Menentukan jumlah baris. Anda dapat mengonfigurasi
rownum = 1ataurownum <= 1.Sintaksis sebelumnya menunjukkan bahwa deduplikasi melibatkan kueri dua tingkat:
Gunakan fungsi jendela
ROW_NUMBER()untuk mengurutkan data berdasarkan atribut waktu yang ditentukan dan gunakan peringkat untuk menandai data.Jika atribut waktu adalahproctime, Realtime Compute for Apache Flink menghapus duplikat berdasarkan waktu ketika rekaman diproses. Dalam hal ini, hasil pengurutan mungkin berbeda setiap kali sistem mengurutkan rekaman data.
Jika atribut waktu adalah rowtime, Realtime Compute for Apache Flink menghapus duplikat berdasarkan waktu ketika rekaman ditulis ke Realtime Compute for Apache Flink. Dalam hal ini, hasil pengurutan tetap sama setiap kali sistem mengurutkan rekaman data.
Simpan hanya rekaman di baris pertama di bawah kunci utama yang ditentukan dan hapus duplikat lainnya.
Anda dapat mengurutkan rekaman dalam urutan naik atau turun berdasarkan atribut waktu.
Deduplicate Keep FirstRow: Realtime Compute for Apache Flink mengurutkan rekaman dalam baris dalam urutan naik berdasarkan atribut waktu dan menyimpan rekaman di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan.
Deduplicate Keep LastRow: Realtime Compute for Apache Flink mengurutkan rekaman dalam baris dalam urutan turun berdasarkan atribut waktu dan menyimpan rekaman di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan.
Deduplicate Keep FirstRow
Jika Anda memilih kebijakan Deduplicate Keep FirstRow, Realtime Compute for Apache Flink menyimpan rekaman di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan dan membuang duplikat lainnya. Dalam hal ini, data status hanya menyimpan informasi kunci utama, dan efisiensi akses ke data status meningkat secara signifikan. Contoh berikut digunakan untuk menjelaskan kebijakan tersebut.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum FROM T ) WHERE rowNum = 1Kode sebelumnya menghapus duplikat dari tabel T berdasarkan bidang b, dan menyimpan rekaman di baris pertama dari baris duplikat di bawah kunci utama yang ditentukan berdasarkan waktu sistem. Dalam contoh ini, atribut proctime menunjukkan waktu sistem ketika rekaman diproses di Realtime Compute for Apache Flink. Realtime Compute for Apache Flink mengurutkan rekaman di tabel T berdasarkan atribut ini. Untuk menghapus duplikat berdasarkan waktu sistem, Anda juga dapat memanggil fungsi PROCTIME() alih-alih mendeklarasikan atribut proctime.
Deduplicate Keep LastRow
Jika Anda memilih kebijakan Deduplicate Keep LastRow, Realtime Compute for Apache Flink menyimpan rekaman di baris terakhir dari baris duplikat di bawah kunci utama yang ditentukan dan membuang duplikat lainnya. Kebijakan ini sedikit lebih unggul daripada fungsi LAST_VALUE dalam hal kinerja. Contoh berikut digunakan untuk menjelaskan kebijakan tersebut.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum FROM T ) WHERE rowNum = 1Kode sebelumnya menghapus duplikat dari tabel T berdasarkan bidang b dan d, dan menyimpan rekaman di baris terakhir di bawah kunci utama yang ditentukan berdasarkan waktu ketika rekaman ditulis ke Realtime Compute for Apache Flink. Dalam contoh ini, atribut rowtime menunjukkan waktu acara ketika rekaman ditulis ke Realtime Compute for Apache Flink. Realtime Compute for Apache Flink mengurutkan rekaman di tabel T berdasarkan atribut ini.
Apa yang perlu saya fokuskan saat menggunakan fungsi bawaan?
Gunakan Fungsi Bawaan untuk Menggantikan Fungsi yang Didefinisikan Pengguna (UDF)
Fungsi bawaan Realtime Compute for Apache Flink terus dioptimalkan. Kami merekomendasikan Anda mengganti UDF dengan fungsi bawaan jika memungkinkan. Realtime Compute for Apache Flink melakukan operasi berikut untuk mengoptimalkan fungsi bawaan:
Meningkatkan efisiensi serialisasi dan deserialisasi.
Mengizinkan operasi pada data dalam byte.
Gunakan Pemisah Satu Karakter dalam Fungsi KEYVALUE
Tanda tangan fungsi KEYVALUE adalah
KEYVALUE(content, keyValueSplit, keySplit, keyName). Jika keyValueSplit dan keySplit adalah pemisah satu karakter, seperti titik dua (:) atau koma (,), Realtime Compute for Apache Flink menggunakan algoritma dioptimalkan. Realtime Compute for Apache Flink mencari nilai KeyName yang diperlukan dalam data biner dan tidak membagi seluruh konten. Kinerja penyebaran meningkat sekitar 30%.Perhatikan Poin-Poin Berikut Saat Anda Menggunakan Operator LIKE:
Untuk mencocokkan rekaman yang dimulai dengan konten tertentu, gunakan
LIKE 'xxx%'.Untuk mencocokkan rekaman yang diakhiri dengan konten tertentu, gunakan
LIKE '%xxx'.Untuk mencocokkan rekaman yang berisi konten tertentu, gunakan
LIKE '%xxx%'.Untuk mencocokkan rekaman yang sama dengan konten tertentu, gunakan
LIKE 'xxx', yang setara denganstr = 'xxx'.Untuk mencocokkan garis bawah (_), gunakan
LIKE '%seller/_id%' ESCAPE '/'. Garis bawah (_) diloloskan karena merupakan wildcard satu karakter dalam SQL dan dapat mencocokkan semua karakter. Jika Anda menggunakanLIKE '%seller_id%', sejumlah besar hasil dikembalikan, sepertiseller_id,seller#id,sellerxid, danseller1id. Ini dapat menyebabkan hasil yang tidak terduga.
Hindari Menggunakan Ekspresi Reguler
Pemrosesan data menggunakan ekspresi reguler memakan waktu, yang dapat menyebabkan overhead kinerja ratusan kali lebih tinggi daripada operasi penambahan, pengurangan, perkalian, atau pembagian. Selain itu, ekspresi reguler mungkin masuk ke loop tak terbatas dalam beberapa kasus ekstrem. Akibatnya, pekerjaan mungkin terhenti. Untuk informasi lebih lanjut, lihat Eksekusi regex terlalu lambat. Untuk mencegah masalah penghentian penyebaran, kami merekomendasikan Anda menggunakan operator LIKE. Untuk informasi lebih lanjut tentang ekspresi reguler umum, klik tautan berikut:
Apa yang harus saya lakukan jika efisiensi pembacaan data rendah dan tekanan balik ada ketika data penuh dibaca dari tabel?
Jika node hilir memproses data dengan kecepatan rendah, tekanan balik mungkin terjadi. Anda dapat memeriksa apakah sink memiliki tekanan balik. Jika sink memiliki tekanan balik, gunakan salah satu metode berikut untuk menyelesaikan masalah tekanan balik pada sink terlebih dahulu: Untuk menyelesaikan masalah ini, Anda dapat menggunakan salah satu metode berikut:
Tingkatkan derajat paralelisme.
Aktifkan fitur optimasi agregasi, seperti miniBatch.
Apa arti warna berbeda untuk legenda di kolom Durasi Status pada tab Metrik Subtugas?

Nilai di kolom Durasi Status menunjukkan durasi setiap fase dari subtugas verteks. Bagian ini menjelaskan arti warna setiap kotak node.
: DIBUAT
: DIJADWALKAN
: MENDEPLOY
: MENGinisialisasi
: BERJALAN
Apa itu thread Koneksi TCP RMI? Mengapa sumber daya CPU yang digunakan oleh Koneksi TCP RMI jauh lebih tinggi daripada sumber daya CPU yang digunakan oleh thread lainnya?

Thread Koneksi TCP RMI adalah thread dalam kerangka Remote Method Invocation (RMI) di Java. Thread ini digunakan untuk memanggil metode jarak jauh. Sumber daya CPU yang digunakan oleh sebuah thread berubah secara dinamis secara real-time. Fluktuasi metrik jangka pendek mungkin tidak menunjukkan bahwa beban CPU keseluruhan terlalu tinggi. Anda dapat mengamati sumber daya CPU yang digunakan oleh thread Koneksi TCP RMI dalam periode waktu tertentu dan menganalisis grafik api dari thread tersebut untuk evaluasi pemanfaatan CPU. Gambar berikut menunjukkan bahwa thread Koneksi TCP RMI hampir tidak mengonsumsi sumber daya CPU.

Mengapa ada perbedaan waktu antara waktu saat ini dan waktu dalam nilai parameter Low Watermark dan Datetime of Watermark Timestamp di tab Watermarks pada tab Status, serta antara waktu saat ini dan waktu dalam nilai metrik Task InputWatermark di bagian Watermark pada tab Metrik?
Penyebab 1: Bidang tipe data
TIMESTAMP_LTZ (TIMESTAMP(p) WITH LOCAL TIME ZONE)digunakan untuk mendeklarasikan watermark di tabel sumber. Akibatnya, ada perbedaan waktu antara waktu saat ini dan nilai parameter terkait watermark.Contoh berikut menunjukkan perbedaan antara watermark yang dideklarasikan menggunakan bidang tipe data TIMESTAMP_LTZ dan watermark yang dideklarasikan menggunakan bidang tipe data TIMESTAMP.
Kode sampel berikut menunjukkan bahwa bidang yang digunakan untuk mendeklarasikan watermark di tabel sumber adalah tipe data TIMESTAMP_LTZ.
CREATE TEMPORARY TABLE s1 ( a INT, b INT, ts as CURRENT_TIMESTAMP,-- Gunakan fungsi bawaan CURRENT_TIMESTAMP untuk menghasilkan data tipe TIMESTAMP_LTZ. WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- Dapatkan hasil perhitungan. INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;nullHasil perhitungan yang dihasilkan menggunakan sintaksis jendela warisan sama dengan hasil perhitungan yang dihasilkan menggunakan
fungsi bernilai tabel (TVF) window. Contoh kode sampel berikut memberikan contoh sintaksis jendela warisan.SELECT b, TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) FROM s1 GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), b;Gambar berikut menunjukkan bahwa ada perbedaan waktu 8 jam antara waktu saat ini (UTC+8) dan waktu yang ditentukan oleh nilai parameter Low Watermark dan Datetime of Watermark Timestamp di tab Watermarks pada tab Status, serta antara waktu saat ini (UTC+8) dan waktu yang ditentukan oleh nilai metrik Task InputWatermark di bagian Watermark pada tab Metrik setelah draf diterapkan dan dipublikasikan di konsol pengembangan Realtime Compute for Apache Flink.
Watermark&Low Watermark

Task InputWatermark

Kode sampel berikut menunjukkan bahwa bidang yang digunakan untuk mendeklarasikan watermark di tabel sumber adalah tipe data TIMESTAMP (TIMESTAMP(p) WITHOUT TIME ZONE).
CREATE TEMPORARY TABLE s1 ( a INT, b INT, -- Tidak ada informasi zona waktu dalam timestamp data simulasi. Dalam hal ini, timestamp bertambah satu detik mulai dari 2024-01-31 01:00:00. ts as TIMESTAMPADD(SECOND, a, TIMESTAMP '2024-01-31 01:00:00'), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector'='datagen', 'rows-per-second'='1', 'fields.a.kind'='sequence','fields.a.start'='0','fields.a.end'='100000', 'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10' ); CREATE TEMPORARY TABLE t1 ( k INT, ts_ltz timestamp_ltz(3), cnt BIGINT ) WITH ('connector' = 'print'); -- Dapatkan hasil perhitungan. INSERT INTO t1 SELECT b, window_start, COUNT(*) FROM TABLE( TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND)) GROUP BY b, window_start, window_end;Setelah Anda menerapkan dan mempublikasikan draf di konsol pengembangan Realtime Compute for Apache Flink, waktu yang ditentukan oleh nilai parameter Low Watermark dan Datetime of Watermark Timestamp di tab Watermarks pada tab Status dan waktu yang ditentukan oleh nilai metrik Task InputWatermark di bagian Watermark pada tab Metrik sama dengan waktu saat ini. Dalam contoh ini, tidak ada perbedaan waktu antara waktu saat ini dan waktu simulasi.
Watermark&Low Watermark

Task InputWatermark

Penyebab 2: Zona waktu waktu tampilan di konsol pengembangan Realtime Compute for Apache Flink berbeda dari zona waktu waktu tampilan di UI Apache Flink.
Waktu tampilan di konsol pengembangan Realtime Compute for Apache Flink adalah UTC+0. Namun, waktu tampilan di UI Apache Flink adalah waktu lokal yang dikonversi berdasarkan zona waktu lokal yang diperoleh UI Apache Flink melalui browser. Contoh berikut menunjukkan perbedaan antara waktu tampilan di konsol pengembangan Realtime Compute for Apache Flink dan waktu tampilan di UI Apache Flink saat menggunakan UTC+8. Waktu tampilan di konsol pengembangan Realtime Compute for Apache Flink adalah 8 jam lebih awal daripada waktu tampilan di UI Apache Flink.
Konsol pengembangan Realtime Compute for Apache Flink

Antarmuka Pengguna Apache Flink

Bagaimana cara menangani masalah tekanan balik?
Pada halaman Deployments, klik nama deployment. Di halaman detail deployment, pilih tab Status.
Periksa parameter Busy dan Backpressured untuk menentukan operator tempat tekanan balik terjadi.
Lampu indikator Busy yang lebih merah menunjukkan beban yang lebih berat. Indikator Backpressured yang lebih gelap menunjukkan masalah tekanan balik yang lebih parah.

Klik operator tempat tekanan balik terjadi.
Di tab BackPressure, periksa status tekanan balik dari subtugas.

Bagaimana cara menangani masalah latensi tinggi?
Di tab Alarm atau Metrics halaman Deployments, lihat metrik currentEmitEventTimeLag dan currentFetchEventTimeLag dan lakukan operasi penanganan terkait. Deskripsi metrik:
Jika nilai metrik
currentEmitEventTimeLagterlalu tinggi, ada latensi saat data ditarik atau diproses untuk penyebaran. Anda harus memeriksa apakah kinerja operator memenuhi persyaratan.Jika nilai metrik
currentFetchEventTimeLagterlalu tinggi, ada latensi saat data ditarik atau diproses di sistem hulu untuk penyebaran. Anda harus menangani masalah I/O jaringan dan masalah di sistem hulu.
Jika masalah latensi tinggi terjadi di sistem hulu, nilai kedua metrik meningkat pada saat yang bersamaan.

Apa yang harus saya lakukan jika tekanan balik terjadi akibat masalah hotspot data dari penyebaran Flink SQL?
Periksa status subtugas dari penyebaran yang mengalami tekanan balik. Anda dapat menentukan bahwa masalah ini disebabkan oleh hotspot data. Dalam hal ini, gunakan salah satu metode berikut untuk optimasi kinerja:
Aktifkan LocalGlobal untuk Menyelesaikan Masalah Hotspot Data Umum
Kebijakan LocalGlobal dapat menyaring beberapa data miring dengan menggunakan agregasi lokal. Ini secara efisien mengurangi masalah hotspot data dalam agregasi global dan meningkatkan kinerja penyebaran.
Kebijakan LocalGlobal membagi proses agregasi menjadi dua fase: agregasi lokal dan agregasi global. Fase-fase ini setara dengan fase combine dan reduce dalam MapReduce. Dalam fase agregasi lokal, Realtime Compute for Apache Flink menggabungkan mikro-batch data yang di-cache secara lokal di setiap node hulu, dan menghasilkan nilai akumulator untuk setiap mikro-batch. Dalam fase agregasi global, akumulator digabungkan ke hasil akhir. Kemudian, hasil agregasi global dihasilkan.
Skenario
Kebijakan LocalGlobal cocok untuk skenario di mana Anda ingin meningkatkan kinerja penyebaran dan menyelesaikan masalah hotspot data dengan menggunakan fungsi agregat umum, seperti SUM, COUNT, MAX, MIN, dan AVG.
Batasan
LocalGlobal diaktifkan secara default. Kebijakan ini memiliki batasan berikut:
Hanya dapat berfungsi jika miniBatch diaktifkan.
AggregateFunction harus digunakan untuk menggabungkan data.
Verifikasi
Untuk menentukan apakah LocalGlobal diaktifkan, periksa apakah node GlobalGroupAggregate atau LocalGroupAggregate ada dalam topologi akhir.
Aktifkan PartialFinal untuk Menyelesaikan Masalah Hotspot Data Saat Anda Menggunakan Fungsi COUNT DISTINCT
Dalam kasus normal, Anda perlu menambahkan lapisan yang menyebarkan data berdasarkan kunci unik saat menggunakan fungsi COUNT DISTINCT. Dengan cara ini, Anda dapat membagi proses agregasi menjadi dua fase untuk menyelesaikan masalah hotspot data. Realtime Compute for Apache Flink sekarang menyediakan kebijakan PartialFinal untuk secara otomatis menyebarkan data dan membagi proses agregasi.
Kebijakan LocalGlobal meningkatkan kinerja fungsi agregat umum, seperti SUM, COUNT, MAX, MIN, dan AVG. Namun, kebijakan LocalGlobal kurang efektif dalam meningkatkan kinerja fungsi COUNT DISTINCT. Hal ini karena agregasi lokal tidak dapat menghapus kunci unik duplikat. Akibatnya, sejumlah besar data menumpuk di fase agregasi global.
Skenario
Kebijakan PartialFinal cocok untuk skenario di mana kinerja agregasi tidak dapat memenuhi persyaratan Anda saat menggunakan fungsi COUNT DISTINCT.
nullAnda tidak dapat mengaktifkan PartialFinal dalam kode Flink SQL yang berisi UDAF.
Untuk mencegah pemborosan sumber daya, kami merekomendasikan Anda mengaktifkan PartialFinal hanya saat jumlah data besar. PartialFinal secara otomatis menyebarkan data dan membagi proses agregasi menjadi dua fase. Ini menyebabkan pengacakan jaringan tambahan.
Cara Mengaktifkan PartialFinal
PartialFinal dinonaktifkan secara default. Untuk mengaktifkan fitur ini, Anda harus memasukkan kode berikut di bidang Other Configuration bagian Parameters pada tab Configuration halaman detail penyebaran.
table.optimizer.distinct-agg.split.enabled: trueVerifikasi
Periksa apakah agregasi satu-lapis berubah menjadi agregasi dua-lapis dalam topologi akhir.
Apa yang harus saya lakukan jika konsumsi data upstream tidak stabil?
Penyebab dan solusi yang mungkin:
Ketidaksesuaian antara Kecepatan Pembuatan Data Upstream dan Kecepatan Pemrosesan Data
Analisis aturan pembuatan data upstream untuk memastikan bahwa kecepatan pembuatan data sesuai dengan kecepatan pemrosesan data.
Tekanan Balik dalam Penyebaran
Periksa apakah ada tekanan balik pada vertex pekerjaan, yang memengaruhi kecepatan konsumsi data upstream. Jika penyebaran hanya memiliki satu node, tambahkan konfigurasi
pipeline.operator-chaining: 'false'dan restart penyebaran. Pecahkan rantai operator penyebaran dan periksa apakah laju konsumsi terpengaruh secara negatif oleh node yang memiliki tekanan balik.Tingkat I/O Abnormal
Periksa metrik input data dan laju konsumsi Realtime Compute for Apache Flink pada saat masalah konsumsi data tidak stabil terjadi untuk memeriksa apakah masalah tersebut disebabkan oleh tingkat I/O abnormal.
Tingkat Konsumsi Data Abnormal
Periksa apakah laju konsumsi data berfluktuasi dengan titik waktu ketika garbage collection (GC) terjadi. Jika laju berfluktuasi dengan titik waktu ketika GC terjadi, periksa penggunaan memori node TaskManager terkait.
