Topik ini menjelaskan fungsi jendela, atribut waktu, dan jenis jendela yang didukung dalam Flink SQL.
Fungsi jendela
Flink SQL mendukung agregasi pada jendela tak terbatas tanpa perlu mendefinisikan jendela secara eksplisit dalam pernyataan SQL. Selain itu, Flink SQL juga mendukung agregasi pada jendela tertentu. Sebagai contoh, untuk menghitung jumlah pengguna yang mengklik URL dalam satu menit terakhir, Anda dapat mendefinisikan jendela untuk mengumpulkan data klik pengguna tersebut, lalu menghitung hasilnya dari data dalam jendela.
Flink SQL mendukung agregat jendela dan agregat over. Topik ini berfokus pada agregat jendela, yang mendukung jendela berdasarkan atribut waktu seperti waktu kejadian dan waktu pemrosesan. Fungsi jendela TUMBLE, HOP, dan SESSION tersedia untuk setiap atribut waktu.
Penggabungan agregasi jendela (TUMBLE, HOP, dan SESSION) dengan fungsi LAST_VALUE, FIRST_VALUE, atau TopN dapat menghasilkan hasil tidak akurat karena mekanisme pemicuan jendela atau latensi.
Atribut waktu
Flink SQL mendukung dua atribut waktu: waktu kejadian dan waktu pemrosesan. Flink menggunakan atribut waktu ini untuk menggabungkan data dalam jendela. Metode pembagian jendela bervariasi tergantung pada atribut waktu yang digunakan.
Waktu kejadian: waktu kejadian yang disediakan, biasanya timestamp yang tertanam dalam rekaman.
Sistem menutup jendela berdasarkan watermark yang dihasilkan dari waktu kejadian data. Jendela hanya berakhir ketika nilai watermark lebih besar dari waktu penutupan jendela. Data keluaran dihasilkan saat jendela berakhir. Jendela menghasilkan data keluaran hanya jika data yang memicu penutupan jendela mengalir ke Flink. Nilai watermark untuk satu subtask bertambah. Jika beberapa subtask atau tabel sumber ada, nilai watermark minimum yang digunakan.
PentingJika ada rekaman data out-of-order dengan waktu lebih lambat dari waktu saat ini dalam tabel sumber, atau tidak ada data dalam subtask atau partisi tabel sumber, jendela mungkin tidak berakhir dan data keluaran bisa abnormal. Untuk menghindari masalah ini, tentukan offset berdasarkan data out-of-order dan pastikan data ada di semua subtask dan partisi tabel sumber. Jika tidak ada data dalam subtask atau partisi tabel sumber, watermark tidak dapat maju dan jendela tidak akan berakhir tepat waktu. Dalam hal ini, tambahkan
table.exec.source.idle-timeout: 10ske bidang Konfigurasi Lainnya di tab Konfigurasi halaman Penyebaran untuk memicu pengakhiran jendela. Untuk informasi lebih lanjut tentang parameter ini, lihat Konfigurasi.Setelah data diproses menggunakan GROUP BY, operasi JOIN pada dua aliran data, atau node OVER window, properti watermark hilang dan waktu kejadian tidak dapat lagi digunakan untuk pembagian jendela.
Waktu pemrosesan: waktu sistem lokal saat sistem memproses suatu kejadian.
Waktu pemrosesan dihasilkan oleh Flink dan tidak ada dalam data mentah Anda. Oleh karena itu, Anda harus secara eksplisit mendefinisikan kolom Waktu Pemrosesan.
CatatanWaktu pemrosesan dipengaruhi oleh kecepatan kejadian mencapai Flink dan urutan data diproses dalam Flink. Akibatnya, hasil setiap backtrack mungkin berbeda.
Jendela kaskade
Atribut waktu kejadian dari kolom rowtime tidak lagi berlaku setelah operasi jendela selesai. Anda dapat menggunakan fungsi pembantu seperti TUMBLE_ROWTIME, HOP_ROWTIME, atau SESSION_ROWTIME untuk mendapatkan max(rowtime) dari kolom rowtime dalam jendela. Nilai ini dapat digunakan sebagai rowtime dari jendela waktu. Nilainya adalah window_end - 1 dan bertipe TIMESTAMP dengan atribut rowtime. Sebagai contoh, jika rentang waktu jendela adalah [00:00, 00:15), maka 00:14:59.999 dikembalikan.
Dalam contoh berikut, jendela tumbling 1 jam digunakan untuk menggabungkan data berdasarkan hasil agregasi dari jendela tumbling 1 menit. Ini membantu memenuhi berbagai persyaratan pembagian jendela.
CREATE TEMPORARY TABLE user_clicks(
username varchar,
click_url varchar,
eventtime varchar,
ts AS TO_TIMESTAMP(eventtime),
WATERMARK FOR ts AS ts - INTERVAL '2' SECOND -- Tentukan watermark untuk rowtime.
) with (
'connector'='sls',
...
);
CREATE TEMPORARY TABLE tumble_output(
window_start TIMESTAMP,
window_end TIMESTAMP,
username VARCHAR,
clicks BIGINT
) with (
'connector'='datahub' -- Layanan Log Sederhana hanya memungkinkan Anda mengekspor pernyataan DDL tipe VARCHAR. Oleh karena itu, DataHub digunakan untuk menyimpan data.
...
);
CREATE TEMPORARY VIEW one_minute_window_output AS
SELECT
TUMBLE_ROWTIME(ts, INTERVAL '1' MINUTE) as rowtime, -- Gunakan TUMBLE_ROWTIME sebagai waktu agregasi untuk jendela level-dua.
username,
COUNT(click_url) as cnt
FROM user_clicks
GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),username;
BEGIN statement set;
INSERT INTO tumble_output
SELECT
TUMBLE_START(rowtime, INTERVAL '1' HOUR),
TUMBLE_END(rowtime, INTERVAL '1' HOUR),
username,
SUM(cnt)
FROM one_minute_window_output
GROUP BY TUMBLE(rowtime, INTERVAL '1' HOUR), username;
END;Data antara
Data antara dari jendela dibagi menjadi dua jenis: keyed state dan timer. Kedua jenis data ini dapat disimpan di media penyimpanan yang berbeda. Anda dapat memilih kombinasi penyimpanan berdasarkan karakteristik pekerjaan Anda. Tabel berikut menjelaskan kombinasi penyimpanan yang didukung:
Penyimpanan untuk keyed state | Penyimpanan untuk timer |
Memori | |
Memori | |
Memori | |
File |
Timer utamanya digunakan untuk memicu jendela yang kedaluwarsa. Jika memori cukup, Anda dapat menyimpan timer di memori untuk performa yang lebih baik. Jika ada banyak timer atau sumber daya memori tidak mencukupi, gunakan RocksDBStateBackend untuk menyimpan timer dalam file RocksDB.