全部产品
Search
文档中心

Realtime Compute for Apache Flink:Jendela OVER

更新时间:Jun 19, 2025

Jendela OVER adalah jendela standar yang digunakan dalam database tradisional. Agregat Over berbeda dari agregat jendela. Dalam aliran data streaming, setiap elemen sesuai dengan sebuah jendela OVER. Jendela OVER dapat ditentukan berdasarkan baris aktual atau nilai timestamp dari elemen. Elemen-elemen dari aliran didistribusikan ke beberapa jendela.

Dalam aliran yang menerapkan jendela OVER, setiap elemen sesuai dengan sebuah jendela OVER dan memicu komputasi data sekali. Baris yang ditentukan oleh setiap elemen yang memicu komputasi adalah baris terakhir dari jendela tempat elemen tersebut berada. Dalam implementasi dasar Realtime Compute, data jendela OVER dikelola secara terpusat. Hanya satu salinan data yang disimpan. Secara logis, sebuah jendela OVER dibuat untuk setiap elemen. Realtime Compute for Apache Flink menghitung data untuk setiap jendela OVER dan kemudian menghapus data yang tidak lagi digunakan setelah perhitungan selesai. Untuk informasi lebih lanjut, lihat Over Aggregation.

Sintaksis

SELECT
    agg1(col1) OVER (definition1) AS colName,
    ...
    aggN(colN) OVER (definition1) AS colNameN
FROM Tab1;
  • agg1(col1): mengagregasi data input berdasarkan kolom col1 yang ditentukan oleh GROUP BY.

  • OVER (definition1): mendefinisikan sebuah jendela OVER.

  • AS colName: menentukan alias sebuah kolom.

null
  • OVER (definition1) untuk agg1 hingga aggN harus sama.

  • Alias yang ditentukan oleh AS dapat di-query menggunakan pernyataan SQL luar.

Tipe jendela

Dalam Flink SQL, jendela OVER didefinisikan sesuai dengan sintaksis SQL standar. Jendela OVER tradisional tidak diklasifikasikan menjadi tipe jendela halus. Jendela OVER diklasifikasikan menjadi dua jenis berikut berdasarkan cara menentukan baris yang dihitung:

  • Jendela ROWS OVER: Setiap baris elemen dianggap sebagai baris baru yang dihitung. Sebuah jendela baru dihasilkan untuk setiap baris.

  • Jendela RANGE OVER: Semua baris elemen dengan nilai timestamp yang sama dianggap sebagai satu baris yang dihitung dan dialokasikan ke jendela yang sama.

Atribut

Atribut ortogonal

Deskripsi

proctime

eventtime

Jendela ROWS OVER

Sebuah jendela ditentukan berdasarkan baris aktual dari elemen.

Didukung

Didukung

Jendela RANGE OVER

Sebuah jendela ditentukan berdasarkan nilai timestamp dari elemen.

Didukung

Didukung

Jendela ROWS OVER

  • Deskripsi

    Untuk jendela ROWS OVER, sebuah jendela dihasilkan untuk setiap elemen.

  • Sintaksis

    SELECT
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)]
         ORDER BY timeCol
         ROWS 
         BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, ...
    FROM Tab1;       
    • value_expression: menentukan ekspresi nilai yang digunakan untuk partisi.

    • timeCol: menentukan bidang waktu yang digunakan untuk mengurutkan elemen.

    • rowCount: menentukan jumlah baris yang mendahului baris saat ini.

  • Contoh

    Contoh ini menjelaskan jendela ROWS OVER yang dibatasi. Dalam contoh ini, tabel produk on-sale berisi ID item, jenis item, waktu peluncuran, dan harga. Hitung harga tertinggi di antara tiga produk yang mirip dengan produk saat ini sebelum produk saat ini dijual.

    • Data Uji

      itemid(VARCHAR)

      itemtype(VARCHAR)

      eventtime(VARCHAR)

      price(DOUBLE)

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

    • Pernyataan Uji

      CREATE TEMPORARY TABLE tmall_item(
        itemid VARCHAR,
        itemtype VARCHAR,
        eventtime varchar,                            
        onselltime AS TO_TIMESTAMP(eventtime),
        price DOUBLE,
        WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND  --Mendefinisikan Watermark untuk Rowtime.
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = '<brokers>',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      SELECT
          itemid,
          itemtype,
          onselltime,
          price,  
          MAX(price) OVER (
              PARTITION BY itemtype 
              ORDER BY onselltime 
              ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxprice
      FROM tmall_item;
    • Hasil Uji

      itemid

      itemtype

      onselltime

      price

      maxprice

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      50

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      60

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      60

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

      20

Jendela RANGE OVER

  • Deskripsi

    Untuk jendela RANGE OVER, semua elemen dengan nilai timestamp yang sama dialokasikan ke jendela yang sama.

  • Sintaksis

    SELECT
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)]
         ORDER BY timeCol
         RANGE 
         BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
    ...
    FROM Tab1;
    • value_expression: menentukan ekspresi nilai yang digunakan untuk partisi.

    • timeCol: menentukan bidang waktu yang digunakan untuk mengurutkan elemen.

    • timeInterval: menentukan interval waktu antara waktu baris saat ini dan waktu elemen baris yang bisa dilacak kembali.

  • Contoh

    Contoh ini menjelaskan jendela RANGE OVER yang dibatasi. Dalam contoh ini, tabel produk on-sale berisi ID item, jenis item, waktu peluncuran, dan harga. Hitung harga tertinggi di antara produk serupa yang dijual dua menit lebih awal dari produk saat ini.

    • Data Uji

      itemid(VARCHAR)

      itemtype(VARCHAR)

      eventtime(VARCHAR)

      price(DOUBLE)

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

    • Pernyataan Uji

      CREATE TEMPORARY TABLE tmall_item(
        itemid VARCHAR,
        itemtype VARCHAR,
        eventtime varchar,                            
        onselltime AS TO_TIMESTAMP(eventtime),
        price DOUBLE,
        WATERMARK FOR onselltime AS onselltime - INTERVAL '2' SECOND  --Mendefinisikan Watermark untuk Rowtime.
      ) WITH (
        'connector' = 'kafka',
        'topic' = '<yourTopic>',
        'properties.bootstrap.servers' = '<brokers>',
        'scan.startup.mode' = 'earliest-offset',
        'format' = 'csv'
      );
      
      SELECT  
          itemid,
          itemtype, 
          onselltime, 
          price,  
          MAX(price) OVER (
              PARTITION BY itemtype 
              ORDER BY onselltime 
              RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxprice
      FROM tmall_item;        
    • Hasil Uji

      itemid

      itemtype

      onselltime

      price

      maxprice

      ITEM001

      Electronic

      2024-11-11 10:01:00

      20

      20

      ITEM002

      Electronic

      2024-11-11 10:02:00

      50

      50

      ITEM003

      Electronic

      2024-11-11 10:03:00

      30

      50

      ITEM004

      Electronic

      2024-11-11 10:03:00

      60

      60

      ITEM005

      Electronic

      2024-11-11 10:05:00

      40

      60

      ITEM006

      Electronic

      2024-11-11 10:06:00

      20

      40

      ITEM007

      Electronic

      2024-11-11 10:07:00

      70

      70

      ITEM008

      Clothes

      2024-11-11 10:08:00

      20

      20