Jendela OVER adalah jendela standar yang digunakan dalam database tradisional. Agregat Over berbeda dari agregat jendela. Dalam aliran data streaming, setiap elemen sesuai dengan sebuah jendela OVER. Jendela OVER dapat ditentukan berdasarkan baris aktual atau nilai timestamp dari elemen. Elemen-elemen dari aliran didistribusikan ke beberapa jendela.
Dalam aliran yang menerapkan jendela OVER, setiap elemen sesuai dengan sebuah jendela OVER dan memicu komputasi data sekali. Baris yang ditentukan oleh setiap elemen yang memicu komputasi adalah baris terakhir dari jendela tempat elemen tersebut berada. Dalam implementasi dasar Realtime Compute, data jendela OVER dikelola secara terpusat. Hanya satu salinan data yang disimpan. Secara logis, sebuah jendela OVER dibuat untuk setiap elemen. Realtime Compute for Apache Flink menghitung data untuk setiap jendela OVER dan kemudian menghapus data yang tidak lagi digunakan setelah perhitungan selesai. Untuk informasi lebih lanjut, lihat Over Aggregation.
Sintaksis
SELECT
agg1(col1) OVER (definition1) AS colName,
...
aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;agg1(col1): mengagregasi data input berdasarkan kolom col1 yang ditentukan oleh GROUP BY.
OVER (definition1): mendefinisikan sebuah jendela OVER.
AS colName: menentukan alias sebuah kolom.
OVER (definition1) untuk agg1 hingga aggN harus sama.
Alias yang ditentukan oleh AS dapat di-query menggunakan pernyataan SQL luar.
Tipe jendela
Dalam Flink SQL, jendela OVER didefinisikan sesuai dengan sintaksis SQL standar. Jendela OVER tradisional tidak diklasifikasikan menjadi tipe jendela halus. Jendela OVER diklasifikasikan menjadi dua jenis berikut berdasarkan cara menentukan baris yang dihitung:
Jendela ROWS OVER: Setiap baris elemen dianggap sebagai baris baru yang dihitung. Sebuah jendela baru dihasilkan untuk setiap baris.
Jendela RANGE OVER: Semua baris elemen dengan nilai timestamp yang sama dianggap sebagai satu baris yang dihitung dan dialokasikan ke jendela yang sama.
Atribut
Atribut ortogonal | Deskripsi | proctime | eventtime |
Jendela ROWS OVER | Sebuah jendela ditentukan berdasarkan baris aktual dari elemen. | Didukung | Didukung |
Jendela RANGE OVER | Sebuah jendela ditentukan berdasarkan nilai timestamp dari elemen. | Didukung | Didukung |
Jendela ROWS OVER
Deskripsi
Untuk jendela ROWS OVER, sebuah jendela dihasilkan untuk setiap elemen.
Sintaksis
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol ROWS BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1;value_expression: menentukan ekspresi nilai yang digunakan untuk partisi.
timeCol: menentukan bidang waktu yang digunakan untuk mengurutkan elemen.
rowCount: menentukan jumlah baris yang mendahului baris saat ini.
Contoh
Contoh ini menjelaskan jendela ROWS OVER yang dibatasi. Dalam contoh ini, tabel produk on-sale berisi ID item, jenis item, waktu peluncuran, dan harga. Hitung harga tertinggi di antara tiga produk yang mirip dengan produk saat ini sebelum produk saat ini dijual.
Data Uji
itemid(VARCHAR)
itemtype(VARCHAR)
eventtime(VARCHAR)
price(DOUBLE)
ITEM001
Electronic
2024-11-11 10:01:0020
ITEM002
Electronic
2024-11-11 10:02:0050
ITEM003
Electronic
2024-11-11 10:03:0030
ITEM004
Electronic
2024-11-11 10:03:0060
ITEM005
Electronic
2024-11-11 10:05:0040
ITEM006
Electronic
2024-11-11 10:06:0020
ITEM007
Electronic
2024-11-11 10:07:0070
ITEM008
Clothes
2024-11-11 10:08:0020
Pernyataan Uji
CREATE TEMPORARY TABLE tmall_item( itemid VARCHAR, itemtype VARCHAR, eventtime varchar, onselltime AS TO_TIMESTAMP(eventtime), price DOUBLE, WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND --Mendefinisikan Watermark untuk Rowtime. ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); SELECT itemid, itemtype, onselltime, price, MAX(price) OVER ( PARTITION BY itemtype ORDER BY onselltime ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxprice FROM tmall_item;Hasil Uji
itemid
itemtype
onselltime
price
maxprice
ITEM001
Electronic
2024-11-11 10:01:0020
20
ITEM002
Electronic
2024-11-11 10:02:0050
50
ITEM003
Electronic
2024-11-11 10:03:0030
50
ITEM004
Electronic
2024-11-11 10:03:0060
60
ITEM005
Electronic
2024-11-11 10:05:0040
60
ITEM006
Electronic
2024-11-11 10:06:0020
60
ITEM007
Electronic
2024-11-11 10:07:0070
70
ITEM008
Clothes
2024-11-11 10:08:0020
20
Jendela RANGE OVER
Deskripsi
Untuk jendela RANGE OVER, semua elemen dengan nilai timestamp yang sama dialokasikan ke jendela yang sama.
Sintaksis
SELECT agg1(col1) OVER( [PARTITION BY (value_expression1,..., value_expressionN)] ORDER BY timeCol RANGE BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, ... FROM Tab1;value_expression: menentukan ekspresi nilai yang digunakan untuk partisi.
timeCol: menentukan bidang waktu yang digunakan untuk mengurutkan elemen.
timeInterval: menentukan interval waktu antara waktu baris saat ini dan waktu elemen baris yang bisa dilacak kembali.
Contoh
Contoh ini menjelaskan jendela RANGE OVER yang dibatasi. Dalam contoh ini, tabel produk on-sale berisi ID item, jenis item, waktu peluncuran, dan harga. Hitung harga tertinggi di antara produk serupa yang dijual dua menit lebih awal dari produk saat ini.
Data Uji
itemid(VARCHAR)
itemtype(VARCHAR)
eventtime(VARCHAR)
price(DOUBLE)
ITEM001
Electronic
2024-11-11 10:01:0020
ITEM002
Electronic
2024-11-11 10:02:0050
ITEM003
Electronic
2024-11-11 10:03:0030
ITEM004
Electronic
2024-11-11 10:03:0060
ITEM005
Electronic
2024-11-11 10:05:0040
ITEM006
Electronic
2024-11-11 10:06:0020
ITEM007
Electronic
2024-11-11 10:07:0070
ITEM008
Clothes
2024-11-11 10:08:0020
Pernyataan Uji
CREATE TEMPORARY TABLE tmall_item( itemid VARCHAR, itemtype VARCHAR, eventtime varchar, onselltime AS TO_TIMESTAMP(eventtime), price DOUBLE, WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND --Mendefinisikan Watermark untuk Rowtime. ) WITH ( 'connector' = 'kafka', 'topic' = '<yourTopic>', 'properties.bootstrap.servers' = '<brokers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' ); SELECT itemid, itemtype, onselltime, price, MAX(price) OVER ( PARTITION BY itemtype ORDER BY onselltime RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxprice FROM tmall_item;Hasil Uji
itemid
itemtype
onselltime
price
maxprice
ITEM001
Electronic
2024-11-11 10:01:0020
20
ITEM002
Electronic
2024-11-11 10:02:0050
50
ITEM003
Electronic
2024-11-11 10:03:0030
50
ITEM004
Electronic
2024-11-11 10:03:0060
60
ITEM005
Electronic
2024-11-11 10:05:0040
60
ITEM006
Electronic
2024-11-11 10:06:0020
40
ITEM007
Electronic
2024-11-11 10:07:0070
70
ITEM008
Clothes
2024-11-11 10:08:0020
20