All Products
Search
Document Center

Realtime Compute for Apache Flink:Group Window Aggregate

Last Updated:Mar 27, 2026

Saat Anda memodifikasi pernyataan SQL untuk penerapan Realtime Compute for Apache Flink yang sedang berjalan, Flink mengevaluasi apakah data state yang ada dapat digunakan kembali. Memahami perubahan mana yang menjaga kompatibilitas state membantu Anda menghindari kehilangan data tak terduga atau pemrosesan ulang penuh.

Topik ini mencakup hasil kompatibilitas state untuk pernyataan SQL yang menggunakan fungsi agregat jendela dengan GROUP BY.

Hasil dari suatu modifikasi adalah salah satu dari tiga kemungkinan berikut:

  • Fully compatible — penerapan dilanjutkan dengan seluruh data state historis tetap utuh.

  • Partially compatible — agregat yang tidak diubah melanjutkan dari state historis; agregat yang baru ditambahkan dimulai ulang dari nol.

  • Incompatible — penerapan tidak dapat menggunakan kembali data state apa pun dan harus dimulai ulang dari awal.

Cara Flink mengevaluasi kompatibilitas state

Flink mengevaluasi penggunaan kembali state berdasarkan komponen mana dalam pernyataan SQL yang berubah:

KomponenFleksibilitasContoh
Agregat non-distinctFleksibel — kompatibilitas parsial mungkinSUM(b), MAX(c), COUNT(c)
Bidang atribut jendelaFleksibel — perubahan sepenuhnya kompatibeltumble_start, tumble_end
Posisi fungsi jendela dalam GROUP BYFleksibel — mengubah urutan kunci jendela sepenuhnya kompatibelTUMBLE(ts, ...) dalam GROUP BY
Definisi jendelaTetap — perubahan apa pun mengganggu kompatibilitasJenis jendela, ukuran jendela, atribut waktu
Kunci GROUP BY (dimensi statistik)Tetap — perubahan apa pun mengganggu kompatibilitasGROUP BY a, d, ...
Agregat distinctTetap — perubahan apa pun mengganggu kompatibilitasCOUNT(DISTINCT c)
Pengaturan early-fire dan late-fireTetap — perubahan apa pun mengganggu kompatibilitastable.exec.emit.early-fire.enabled

Kerangka kerja ini menjelaskan mengapa beberapa perubahan dapat diakomodasi sebagian (Flink dapat membuang atau menambahkan agregat individual tanpa membangun ulang seluruh state), sementara yang lain memerlukan restart state lengkap (struktur dasar komputasi berubah).

Referensi cepat

Gunakan tabel ini untuk menentukan dampak kompatibilitas dari perubahan tertentu sebelum menerapkannya.

PerubahanKompatibilitas
Menambahkan agregat non-distinctPartial
Menghapus agregat non-distinctFull
Menambahkan dan menghapus agregat non-distinct dalam satu perubahanPartial
Memodifikasi agregat non-distinctPartial
Mengubah urutan agregat non-distinctFull
Mengubah logika perhitungan bidang dalam agregat non-distinctPartial
Menambahkan atau menghapus bidang atribut jendela (tumble_start, tumble_end)Full
Mengubah urutan kunci GROUP BY — hanya posisi kunci fungsi jendela yang berubahFull
Tidak ada agregat sebelum atau setelah perubahanFull
Memodifikasi jenis jendela (misalnya, TUMBLE menjadi HOP)Incompatible
Memodifikasi ukuran jendelaIncompatible
Memodifikasi atribut waktu (rowtime menjadi proctime, atau sebaliknya)Incompatible
Menambahkan, menghapus, atau memodifikasi kunci GROUP BY (dimensi statistik)Incompatible
Menambahkan, menghapus, atau memodifikasi agregat distinctIncompatible
Menghapus semua agregatIncompatible
Tambah atau hapus early-fire atau late-fireIncompatible
Mengubah urutan kunci GROUP BY non-jendelaIncompatible
Menambahkan agregat ke penerapan yang sebelumnya tidak memiliki agregatIncompatible
Hanya ada satu agregat dan logika perhitungannya berubahIncompatible
Kumpulan agregat berubah sepenuhnya (tidak ada tumpang tindih)Incompatible
SQL berisi 'table.exec.emit.early-fire.enabled' = 'true' atau 'table.exec.emit.late-fire.enabled' = 'true' sebelum atau setelah perubahanIncompatible
SQL berisi fungsi agregat yang didefinisikan pengguna (UDAF) Python sebelum atau setelah perubahanUnknown

Modifikasi yang menjaga atau sebagian menjaga state

Menambahkan, menghapus, atau memodifikasi agregat non-distinct

Perubahan berikut menghasilkan kompatibilitas parsial atau penuh:

  • Menambahkan: kompatibel sebagian. Agregat baru mulai menghitung dari nol saat penerapan dijalankan ulang.

  • Menghapus: kompatibel penuh. Data state dari agregat yang dihapus dibuang; semua agregat lain tidak terpengaruh.

  • Menambahkan dan menghapus dalam satu perubahan: kompatibel sebagian. Agregat yang ditambahkan dimulai dari nol; state agregat yang dihapus dibuang.

  • Memodifikasi (mengganti satu agregat dengan agregat lain): kompatibel sebagian. Agregat asli dianggap dihapus dan statenya dibuang; agregat baru dianggap ditambahkan dan dimulai dari nol.

Untuk agregat yang tidak Anda modifikasi, hasil setelah penggunaan kembali state identik dengan hasil yang dihitung dari seluruh data historis.
-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Tambahkan COUNT(c): kompatibel sebagian.
-- SUM(b) dan MAX(c) tidak terpengaruh. COUNT(c) dimulai dari 0.
SELECT a, SUM(b), MAX(c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Hapus SUM(b): kompatibel penuh.
-- MAX(c) tidak terpengaruh.
SELECT a, MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Ubah MAX(c) menjadi MIN(c): kompatibel sebagian.
-- SUM(b) tidak terpengaruh. State MAX(c) dibuang; MIN(c) dimulai dari 0.
SELECT a, SUM(b), MIN(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Mengubah urutan agregat non-distinct

Mengubah urutan agregat dalam daftar SELECT tidak memengaruhi kompatibilitas. Semua hasil tetap benar.

-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Tukar SUM(b) dan MAX(c): kompatibel penuh.
SELECT a, MAX(c), SUM(b)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Mengubah logika perhitungan bidang dalam agregat non-distinct

Mengubah cara bidang input dihitung dianggap sebagai modifikasi agregat. Agregat asli dibuang dan yang baru dimulai dari awal.

-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Ubah MAX(c) menjadi MAX(SUBSTRING(c, 1, 5)): kompatibel sebagian.
-- SUM(b) tidak terpengaruh. State MAX(c) dibuang; MAX(SUBSTRING(c, 1, 5)) dimulai dari 0.
SELECT a, SUM(b), MAX(c)
FROM (SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable)
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Menambahkan atau menghapus bidang atribut jendela

Menambahkan atau menghapus tumble_start, tumble_end, atau bidang atribut jendela serupa lainnya sepenuhnya kompatibel.

-- Pernyataan SQL asli
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Tambahkan window_end: kompatibel penuh.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start,
  tumble_end(ts, INTERVAL '1' MINUTE) AS window_end
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Hapus window_start: kompatibel penuh.
SELECT a,
  SUM(b),
  MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Mengubah urutan kunci GROUP BY — hanya kunci fungsi jendela

Jika hanya posisi kunci fungsi jendela dalam klausa GROUP BY yang berubah dan semua kunci lain tetap dalam urutan yang sama, kompatibilitas tetap terjaga.

-- Pernyataan SQL asli
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);

-- Pindahkan kunci fungsi jendela: kompatibel penuh.
-- Posisi a dan b tidak berubah.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE), b;

Modifikasi yang mengganggu kompatibilitas state

Perubahan berikut membuat penerapan tidak kompatibel dengan data state yang ada. Penerapan harus membuang seluruh state dan menghitung ulang hasil dari awal inputnya.

Memodifikasi atribut jendela

Mengubah jenis jendela, ukuran jendela, atau atribut waktu mengganggu kompatibilitas.

-- Pernyataan SQL asli
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Ubah jenis jendela dari TUMBLE menjadi HOP: tidak kompatibel.
SELECT a,
  SUM(b),
  MAX(c),
  hop_start(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, HOP(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE);

-- Ubah ukuran jendela dari 1 menit menjadi 2 menit: tidak kompatibel.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '2' MINUTE);

-- Ubah atribut waktu dari ts (rowtime) menjadi proctime: tidak kompatibel.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(proctime, INTERVAL '1' MINUTE);

Menambahkan, menghapus, atau memodifikasi kunci GROUP BY

Kunci GROUP BY mendefinisikan dimensi statistik. Perubahan apa pun pada kunci-kunci ini — termasuk mengubah logika perhitungan bidang yang digunakan sebagai kunci — mengganggu kompatibilitas.

-- Pernyataan SQL asli
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Tambahkan kunci GROUP BY d: tidak kompatibel.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, d, TUMBLE(ts, INTERVAL '1' MINUTE);

Menambahkan, menghapus, atau memodifikasi agregat distinct

Perubahan apa pun pada agregat yang menggunakan DISTINCT mengganggu kompatibilitas, bahkan jika semua agregat lain tetap tidak berubah.

-- Pernyataan SQL asli
SELECT a,
  SUM(b),
  MAX(c),
  COUNT(DISTINCT c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Tambahkan COUNT(DISTINCT b): tidak kompatibel.
SELECT a,
  SUM(b),
  COUNT(DISTINCT b),
  MAX(c),
  COUNT(DISTINCT c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Menghapus semua agregat

Menghapus seluruh agregat dari pernyataan SQL mengganggu kompatibilitas. Seluruh state dibuang.

-- Pernyataan SQL asli
SELECT a,
  SUM(b),
  COUNT(DISTINCT b),
  MAX(c),
  COUNT(DISTINCT c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Hapus semua agregat: tidak kompatibel.
SELECT a,
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Menambahkan atau menghapus early-fire atau late-fire

Perubahan apa pun pada pengaturan emit early-fire atau late-fire mengganggu kompatibilitas.

Mengubah urutan kunci GROUP BY non-jendela

Mengubah urutan kunci GROUP BY selain kunci fungsi jendela mengganggu kompatibilitas.

-- Pernyataan SQL asli
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);

-- Tukar a dan b: tidak kompatibel.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY b, a, TUMBLE(rowtime, INTERVAL '15' MINUTE);

Menambahkan agregat ke penerapan tanpa agregat

Jika SQL asli tidak memiliki agregat dan Anda menambahkan satu, kompatibilitas terganggu.

-- Pernyataan SQL asli (tanpa agregat)
SELECT a, b, c
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);

-- Tambahkan COUNT(c): tidak kompatibel.
SELECT a, b, c, COUNT(c)
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);

Hanya satu agregat yang ada dan logika perhitungannya berubah

Jika penerapan hanya memiliki satu agregat dan Anda memodifikasi cara perhitungannya, kompatibilitas terganggu.

-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(b), MAX(c)
FROM MyTable
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

-- Modifikasi satu-satunya agregat yang tersisa: tidak kompatibel.
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

Metrik statistik sebelum dan setelah perubahan sepenuhnya berbeda

Jika tidak ada agregat dari sebelum perubahan yang tersisa setelah perubahan, kompatibilitas terganggu.

-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);

-- Ganti dengan kumpulan agregat yang sama sekali berbeda: tidak kompatibel.
SELECT a, MIN(b), AVG(b) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);

SQL berisi konfigurasi early-fire atau late-fire

Jika pernyataan SQL mencakup 'table.exec.emit.early-fire.enabled' = 'true' atau 'table.exec.emit.late-fire.enabled' = 'true' baik sebelum maupun setelah modifikasi, kompatibilitas terganggu.

-- Pernyataan SQL asli
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

-- Tambahkan konfigurasi early-fire atau late-fire: tidak kompatibel.
SET 'table.exec.emit.early-fire.enabled' = 'true';
SET 'table.exec.emit.early-fire.delay' = '500ms';
-- atau
SET 'table.exec.emit.late-fire.enabled' = 'true';
SET 'table.exec.emit.late-fire.delay' = '1s';
SET 'table.exec.emit.allow-lateness' = '5s';

SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

SQL berisi UDAF Python

Jika pernyataan SQL mencakup fungsi agregat yang didefinisikan pengguna (UDAF) Python baik sebelum maupun setelah modifikasi, kompatibilitas antara penerapan dan data statenya unknown.

-- SQL dengan UDAF Python (weighted_avg) sebelum atau setelah modifikasi — kompatibilitas tidak diketahui.
SELECT COUNT(DISTINCT b), a, SUM(DISTINCT b), weighted_avg(a, b)
FROM MyTable
GROUP BY a, c;