OVER window adalah fungsi jendela standar yang digunakan dalam database tradisional. Berbeda dengan jendela GROUP BY, setiap elemen dalam OVER window berkorespondensi dengan satu frame jendela unik, yang dapat didefinisikan berdasarkan baris elemen atau nilai timestamp. Akibatnya, elemen aliran dapat termasuk dalam beberapa jendela sekaligus.
Dalam aliran yang menggunakan OVER window, setiap elemen berkorespondensi dengan satu jendela unik dan memicu komputasi satu kali, dengan baris pemicu tersebut menjadi baris terakhir dari jendela elemennya. Dalam implementasi dasar Realtime Compute, data untuk OVER window dikelola secara terpusat dan hanya disimpan dalam satu salinan. Secara logis, satu OVER window dibuat untuk setiap elemen. Realtime Compute menghitung data untuk setiap OVER window, lalu menghapus data yang tidak lagi diperlukan setelah perhitungan selesai. Untuk informasi lebih lanjut, lihat Over Aggregation.
Sintaksis
SELECT
agg1(col1) OVER (definition1) AS colName,
...
aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;agg1(col1): Fungsi agregasi yang menghitung kolom col1.
OVER (definition1): Klausul OVER yang mendefinisikan jendela.
AS colName: Alias untuk kolom hasil.
Definisi OVER harus sama untuk semua agregasi dari agg1 hingga aggN.
Kueri SQL luar dapat menggunakan alias untuk mengambil data.
Jenis
Dalam Flink SQL, definisi OVER window mengikuti sintaksis SQL standar. OVER window tradisional tidak diklasifikasikan ke dalam tipe yang lebih rinci, tetapi dapat dibagi menjadi dua jenis berikut berdasarkan cara baris untuk perhitungan didefinisikan:
ROWS OVER window: Setiap baris dianggap sebagai batas perhitungan baru, sehingga setiap baris mendefinisikan jendela tersendiri.
RANGE OVER window: Semua baris dengan nilai waktu yang sama dianggap sebagai satu batas perhitungan, sehingga termasuk dalam jendela yang sama.
Properti
Properti ortogonal | Deskripsi | proctime | eventtime |
ROWS OVER Window | Mendefinisikan jendela berdasarkan baris aktual elemen. | Support | Support |
RANGE OVER Window | Mendefinisikan jendela berdasarkan nilai aktual (timestamp) elemen. | Support | Support |
Semantik ROWS OVER window
Data jendela
Setiap elemen pada ROWS OVER window mendefinisikan satu jendela.
Sintaksis jendela
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol ROWS BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1;value_expression: Ekspresi untuk partisi.
timeCol: Bidang atribut waktu untuk pengurutan elemen.
rowCount: Jumlah baris sebelumnya yang dimasukkan ke dalam frame jendela, relatif terhadap baris saat ini.
Contoh
Sebagai contoh, pertimbangkan skenario Bounded ROWS OVER Window. Misalkan Anda memiliki tabel daftar produk yang berisi ID produk, tipe produk, waktu penayangan produk, dan harga produk. Tujuannya adalah menemukan harga tertinggi di antara tiga produk sebelumnya dengan tipe yang sama.
Data sampel untuk tabel tmall_item
itemid (VARCHAR)
itemtype (VARCHAR)
eventtime (VARCHAR)
price (DOUBLE)
ITEM001
Electronic
2024-11-11 10:01:0020
ITEM002
Electronic
2024-11-11 10:02:0050
ITEM003
Electronic
2024-11-11 10:03:0030
ITEM004
Electronic
2024-11-11 10:03:0060
ITEM005
Electronic
2024-11-11 10:05:0040
ITEM006
Electronic
2024-11-11 10:06:0020
ITEM007
Electronic
2024-11-11 10:07:0070
ITEM008
Clothes
2024-11-11 10:08:0020
Kode sampel
CREATE TEMPORARY TABLE tmall_item( itemid VARCHAR, itemtype VARCHAR, eventtime varchar, onselltime AS TO_TIMESTAMP(eventtime), price DOUBLE, WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND -- Define a watermark for the rowtime. ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); SELECT itemid, itemtype, onselltime, price, MAX(price) OVER ( PARTITION BY itemtype ORDER BY onselltime ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxprice FROM tmall_item;Hasil
itemid
itemtype
onselltime
price
maxprice
ITEM001
Electronic
2024-11-11 10:01:0020
20
ITEM002
Electronic
2024-11-11 10:02:0050
50
ITEM003
Electronic
2024-11-11 10:03:0030
50
ITEM004
Electronic
2024-11-11 10:03:0060
60
ITEM005
Electronic
2024-11-11 10:05:0040
60
ITEM006
Electronic
2024-11-11 10:06:0020
60
ITEM007
Electronic
2024-11-11 10:07:0070
70
ITEM008
Clothes
2024-11-11 10:08:0020
20
Semantik RANGE OVER window
Data jendela
Pada RANGE OVER window, semua baris yang memiliki nilai yang sama pada kolom pengurutan—misalnya timestamp—mendefinisikan satu frame jendela.
Sintaksis jendela
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol RANGE BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1;value_expression: Ekspresi untuk partisi.
timeCol: Bidang atribut waktu untuk pengurutan elemen.
timeInterval: Interval waktu dari baris sebelumnya yang dimasukkan ke dalam frame jendela, relatif terhadap baris saat ini.
Contoh
Contoh ini menunjukkan RANGE OVER window terbatas. Asumsikan Anda memiliki tabel daftar produk dengan ID produk, tipe, waktu penayangan, dan harga. Tujuannya adalah menemukan harga tertinggi di antara produk dengan tipe yang sama yang ditayangkan dalam rentang dua menit sebelum waktu penayangan produk saat ini, termasuk produk tersebut.
Data sampel untuk tabel tmall_item
itemid (VARCHAR)
itemtype (VARCHAR)
eventtime (VARCHAR)
price (DOUBLE)
ITEM001
Electronic
2024-11-11 10:01:0020
ITEM002
Electronic
2024-11-11 10:02:0050
ITEM003
Electronic
2024-11-11 10:03:0030
ITEM004
Electronic
2024-11-11 10:03:0060
ITEM005
Electronic
2024-11-11 10:05:0040
ITEM006
Electronic
2024-11-11 10:06:0020
ITEM007
Electronic
2024-11-11 10:07:0070
ITEM008
Clothes
2024-11-11 10:08:0020
Kode contoh
CREATE TEMPORARY TABLE tmall_item( itemid VARCHAR, itemtype VARCHAR, eventtime varchar, onselltime AS TO_TIMESTAMP(eventtime), price DOUBLE, WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND -- Define a watermark for the rowtime. ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); SELECT itemid, itemtype, onselltime, price, MAX(price) OVER ( PARTITION BY itemtype ORDER BY onselltime RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxprice FROM tmall_item;Hasil
itemid
itemtype
onselltime
price
maxprice
ITEM001
Electronic
2024-11-11 10:01:0020
20
ITEM002
Electronic
2024-11-11 10:02:0050
50
ITEM003
Electronic
2024-11-11 10:03:0030
50
ITEM004
Electronic
2024-11-11 10:03:0060
60
ITEM005
Electronic
2024-11-11 10:05:0040
60
ITEM006
Electronic
2024-11-11 10:06:0020
40
ITEM007
Electronic
2024-11-11 10:07:0070
70
ITEM008
Clothes
2024-11-11 10:08:0020
20