Realtime Compute for Apache Flink mendukung dua jenis agregasi jendela: agregasi jendela grup dan agregasi fungsi bernilai tabel jendela (TVF). Topik ini menjelaskan sintaksis berbagai jenis agregasi jendela, kasus penggunaan di mana window TVFs tidak dapat digunakan dalam kueri agregasi, serta dukungan untuk aliran pembaruan dalam berbagai jenis agregasi jendela.
Informasi latar belakang
Agregasi jendela grup (sintaks lama): sesuai dengan Operator GroupWindowAggregation dan mendukung fungsi jendela TUMBLE, HOP, dan SESSION.
Agregasi Window TVF (sintaks baru): mendukung Window TVFs, optimasi yang dijelaskan dalam Optimasi Kinerja, sintaksis standar
GROUPING SETS, dan aplikasi Window Top-N pada hasil agregasi jendela. Jenis agregasi jendela ini sesuai dengan operator WindowAggregate dan mendukung fungsi jendela TUMBLE, HOP, CUMULATE, dan SESSION.
Agregasi jendela grup sudah tidak direkomendasikan. Kami menyarankan Anda menggunakan agregasi Window TVF, yang lebih efisien dan serbaguna.
Untuk informasi tentang dukungan untuk aliran pembaruan, lihat Perbandingan Dukungan untuk Aliran Pembaruan.
Agregasi jendela grup (sintaks lama)
Agregasi jendela grup didefinisikan dalam klausa GROUP BY dari kueri SQL. Sama seperti kueri yang menggunakan klausa GROUP BY biasa, kueri yang mengandung fungsi jendela dalam klausa GROUP BY mengembalikan satu hasil perhitungan untuk setiap grup.
Untuk informasi tentang sintaksis, contoh, dan fitur agregasi jendela grup, lihat Agregasi Jendela Grup.
Agregasi Window TVF (sintaks baru)
Agregasi Window TVF didefinisikan dalam klausa GROUP BY yang berisi kolom window_start dan window_end yang dihasilkan oleh Window TVFs. Sama seperti kueri yang menggunakan klausa GROUP BY biasa, agregasi Window TVF mengembalikan satu hasil perhitungan untuk setiap grup.
Berbeda dengan agregasi pada tabel kontinu, agregasi Window TVF tidak menghasilkan hasil antara dan hanya menghasilkan hasil akhir di akhir jendela. Data status antara yang tidak diperlukan dibersihkan.
Untuk informasi tentang sintaksis, contoh, dan fitur agregasi Window TVF, lihat Agregasi Window TVF.
Window TVF SESI: VVR 11 vs VVR 8
Sintaks VVR 11.x (Flink 1.20)
SESSION(TABLE data [PARTITION BY(keycols, ...)], DESCRIPTOR(timecol), gap)Parameter:
data: sebuah tabel dengan kolom atribut waktu.keycols: deskriptor kolom yang menentukan kolom mana yang digunakan untuk mempartisi data sebelum pembuatan jendela sesi.timecol: deskriptor kolom yang menentukan kolom atribut waktu mana yang dipetakan ke jendela sesi.gap: interval waktu maksimum antara dua event yang termasuk dalam jendela sesi yang sama.
Sintaks VVR 8.x (Flink 1.17)
Kami sarankan Anda meningkatkan ke VVR 11.1 atau yang lebih baru.
SESSION(TABLE data, DESCRIPTOR(timecol), gap)Parameter:
data: sebuah tabel dengan kolom atribut waktu.timecol: deskriptor kolom yang menentukan kolom atribut waktu mana yang dipetakan ke jendela sesi.gap: interval waktu maksimum antara dua event yang termasuk dalam jendela sesi yang sama.
Tabel berikut membandingkan Window TVF SESI antara VVR 11.x dan VVR 8.x:
Item | VVR 11.x | VVR 8.x | Perbedaan |
Sintaks |
|
| VVR 8.x tidak mendukung |
Cara Menentukan Bidang Partisi | Mendukung penentuan eksplisit bidang partisi melalui | Mendukung penentuan implisit bidang partisi melalui klausa | Dalam VVR 8.x, bidang partisi harus menjadi bidang |
Batasan pada Bidang Partisi | Tidak ada. | Bidang partisi harus disertakan dalam klausa | Dalam VVR 8.x, bidang partisi ditentukan secara implisit dalam logika agregasi, sementara VVR 11.x memberikan fleksibilitas lebih. |
Kelengkapan Parameter | Parameter lengkap: | Parameter disederhanakan: | VVR 8.x bergantung pada logika agregasi untuk menyimpulkan informasi partisi. |
Dukungan Penggunaan Mandiri | Mendukung pemanggilan fungsi |
| VVR 8.x memaksa menggabungkan fungsi jendela dengan pernyataan agregasi, sementara VVR 11.x mendukung skenario penggunaan yang lebih fleksibel. |
Penggabungan Fungsi Jendela dan Agregasi | Mendukung penggabungan fungsi jendela dengan pernyataan agregasi, seperti | Tidak mendukung kasus penggunaan di mana Window TVFs dan pernyataan agregasi tidak dapat digabungkan. Artinya, logika agregasi harus konsisten dengan fungsi jendela. | VVR 8.x memiliki batasan pada penggabungan agregasi dengan jendela. |
Contoh SQL berikut ini setara, keduanya menggunakan item sebagai bidang partisi untuk fungsi jendela SESI:
-- tabel harus memiliki atribut waktu, seperti `bidtime` dalam tabel ini
> desc Bid;
+-------------+------------------------+------+-----+--------+---------------------------------+
| name | type | null | key | extras | watermark |
+-------------+------------------------+------+-----+--------+---------------------------------+
| bidtime | TIMESTAMP(3) *ROWTIME* | true | | | `bidtime` - INTERVAL '1' SECOND |
| price | DECIMAL(10, 2) | true | | | |
| item | STRING | true | | | |
+-------------+------------------------+------+-----+--------+---------------------------------+
-- VVR 11.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;
-- VVR 8.x
> SELECT window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;Perbandingan kode SQL:
Item | VVR 11.x | VVR 8.x | Deskripsi |
Pembagian jendela SESI |
|
| VVR 8.x bergantung pada |
Penggabungan agregasi dan jendela | Mendukung penggabungan langsung (seperti perhitungan | Bidang agregasi harus cocok dengan bidang partisi jendela (seperti | VVR 8.x memiliki kendala implisit pada penggabungan agregasi dengan jendela. |
Batasan Window TVFs dalam kueri agregasi
Fungsi Window TVF SESI digunakan sebagai contoh untuk menggambarkan kasus penggunaan di mana Window TVFs tidak dapat digunakan dalam kueri agregasi.
Jika Anda membuat jendela berdasarkan waktu pemrosesan dalam kueri agregasi yang tidak mendukung Window TVFs, kolom waktu pemrosesan dimaterialisasi dan digunakan sebagai atribut waktu dari jendela yang dibuat. Dalam hal ini, watermark dari tabel sumber dapat memengaruhi hasil agregasi. Misalnya, hasil agregasi untuk jendela mungkin dihasilkan lebih awal dari yang diharapkan. Selain itu, catatan data yang tertunda mungkin dibuang, mirip dengan kasus jendela yang dibuat berdasarkan waktu event. Untuk mencegah masalah ini, pastikan bahwa kueri agregasi yang berisi Window TVFs dalam pernyataan SQL Anda tidak memenuhi kondisi berikut.
Penyaringan atau perhitungan pada bidang window_start, window_end, atau window_time dilakukan. Contoh:
-- Penyaringan berdasarkan window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) where window_start >= TIMESTAMP '2020-04-15 08:06:00.000') GROUP BY item, window_start, window_end; -- Perhitungan window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, window_start + (INTERVAL '1' SECOND) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end; -- Perhitungan window_start > SELECT window_start, window_end, item, SUM(price) AS total_price FROM (SELECT item, price, CAST(window_start as varchar) as window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))) GROUP BY item, window_start, window_end;Window TVF digunakan bersama dengan fungsi tabel yang didefinisikan pengguna (UDTF). Contoh:
> SELECT window_start, window_end, category, SUM(price) AS total_price FROM (SELECT category, price, window_start, window_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)), LATERAL TABLE(category_udtf(item)) as T(category)) GROUP BY category, window_start, window_end;Klausa GROUP BY tidak berisi kedua bidang window_start dan window_end. Contoh:
SELECT window_start, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start;Fungsi agregat yang didefinisikan pengguna Python (UDAF) digunakan.
GROUPING SETS, CUBE, atau ROLLUP digunakan dalam klausa GROUP BY, yang menentukan bahwa data dikelompokkan secara terpisah berdasarkan bidang window_start dan window_end. Contoh:
> SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY GROUPING SETS((item), (window_start), (window_end)); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY CUBE (item, window_start, window_end); > SELECT item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY ROLLUP (item, window_start, window_end);Fungsi agregat diterapkan pada bidang window_start, window_end, atau window_time. Contoh:
> SELECT window_start, window_end, item, SUM(price) AS total_price, max(window_end) AS max_end FROM TABLE( SESSION(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Perbandingan Dukungan untuk Aliran Pembaruan
Fungsi jendela | Sintaks lama (GroupWindowAggregation) | Sintaks baru (WindowAggregate) | |
VVR dan Apache Flink | VVR | Apache Flink | |
TUMBLE | Ya | Ya | Tidak |
HOP | Ya | Ya | Tidak |
SESSION | Ya | Ya Catatan Untuk informasi tentang perbedaan antara fungsi jendela SESI dalam VVR dan Apache Flink, lihat Kueri. | Ya (sejak Apache Flink 1.19) |
CUMULATE | Tidak tersedia | Ya Catatan Ya (sejak VVR 8.0.6) | Tidak |
Dalam sintaks lama, dukungan untuk aliran pembaruan sama baik Anda menggunakan VVR atau Apache Flink. Dalam sintaks baru, hanya operator WindowAggregate yang disediakan oleh VVR yang mendukung aliran pembaruan untuk semua fungsi jendela. Hal ini karena VVR mendukung operator GroupWindowAggregation dan WindowAggregate serta dapat secara otomatis memilih operator yang sesuai berdasarkan aliran input.