Saat Anda memodifikasi pernyataan SQL untuk penerapan Realtime Compute for Apache Flink yang sedang berjalan, Flink mengevaluasi apakah data state yang ada dapat digunakan kembali. Memahami perubahan mana yang menjaga kompatibilitas state membantu Anda menghindari kehilangan data tak terduga atau pemrosesan ulang penuh.
Topik ini mencakup hasil kompatibilitas state untuk pernyataan SQL yang menggunakan fungsi agregat jendela dengan GROUP BY.
Hasil dari suatu modifikasi adalah salah satu dari tiga kemungkinan berikut:
Fully compatible — penerapan dilanjutkan dengan seluruh data state historis tetap utuh.
Partially compatible — agregat yang tidak diubah melanjutkan dari state historis; agregat yang baru ditambahkan dimulai ulang dari nol.
Incompatible — penerapan tidak dapat menggunakan kembali data state apa pun dan harus dimulai ulang dari awal.
Cara Flink mengevaluasi kompatibilitas state
Flink mengevaluasi penggunaan kembali state berdasarkan komponen mana dalam pernyataan SQL yang berubah:
| Komponen | Fleksibilitas | Contoh |
|---|---|---|
| Agregat non-distinct | Fleksibel — kompatibilitas parsial mungkin | SUM(b), MAX(c), COUNT(c) |
| Bidang atribut jendela | Fleksibel — perubahan sepenuhnya kompatibel | tumble_start, tumble_end |
| Posisi fungsi jendela dalam GROUP BY | Fleksibel — mengubah urutan kunci jendela sepenuhnya kompatibel | TUMBLE(ts, ...) dalam GROUP BY |
| Definisi jendela | Tetap — perubahan apa pun mengganggu kompatibilitas | Jenis jendela, ukuran jendela, atribut waktu |
| Kunci GROUP BY (dimensi statistik) | Tetap — perubahan apa pun mengganggu kompatibilitas | GROUP BY a, d, ... |
| Agregat distinct | Tetap — perubahan apa pun mengganggu kompatibilitas | COUNT(DISTINCT c) |
| Pengaturan early-fire dan late-fire | Tetap — perubahan apa pun mengganggu kompatibilitas | table.exec.emit.early-fire.enabled |
Kerangka kerja ini menjelaskan mengapa beberapa perubahan dapat diakomodasi sebagian (Flink dapat membuang atau menambahkan agregat individual tanpa membangun ulang seluruh state), sementara yang lain memerlukan restart state lengkap (struktur dasar komputasi berubah).
Referensi cepat
Gunakan tabel ini untuk menentukan dampak kompatibilitas dari perubahan tertentu sebelum menerapkannya.
| Perubahan | Kompatibilitas |
|---|---|
| Menambahkan agregat non-distinct | Partial |
| Menghapus agregat non-distinct | Full |
| Menambahkan dan menghapus agregat non-distinct dalam satu perubahan | Partial |
| Memodifikasi agregat non-distinct | Partial |
| Mengubah urutan agregat non-distinct | Full |
| Mengubah logika perhitungan bidang dalam agregat non-distinct | Partial |
Menambahkan atau menghapus bidang atribut jendela (tumble_start, tumble_end) | Full |
| Mengubah urutan kunci GROUP BY — hanya posisi kunci fungsi jendela yang berubah | Full |
| Tidak ada agregat sebelum atau setelah perubahan | Full |
| Memodifikasi jenis jendela (misalnya, TUMBLE menjadi HOP) | Incompatible |
| Memodifikasi ukuran jendela | Incompatible |
Memodifikasi atribut waktu (rowtime menjadi proctime, atau sebaliknya) | Incompatible |
| Menambahkan, menghapus, atau memodifikasi kunci GROUP BY (dimensi statistik) | Incompatible |
| Menambahkan, menghapus, atau memodifikasi agregat distinct | Incompatible |
| Menghapus semua agregat | Incompatible |
| Tambah atau hapus early-fire atau late-fire | Incompatible |
| Mengubah urutan kunci GROUP BY non-jendela | Incompatible |
| Menambahkan agregat ke penerapan yang sebelumnya tidak memiliki agregat | Incompatible |
| Hanya ada satu agregat dan logika perhitungannya berubah | Incompatible |
| Kumpulan agregat berubah sepenuhnya (tidak ada tumpang tindih) | Incompatible |
SQL berisi 'table.exec.emit.early-fire.enabled' = 'true' atau 'table.exec.emit.late-fire.enabled' = 'true' sebelum atau setelah perubahan | Incompatible |
| SQL berisi fungsi agregat yang didefinisikan pengguna (UDAF) Python sebelum atau setelah perubahan | Unknown |
Modifikasi yang menjaga atau sebagian menjaga state
Menambahkan, menghapus, atau memodifikasi agregat non-distinct
Perubahan berikut menghasilkan kompatibilitas parsial atau penuh:
Menambahkan: kompatibel sebagian. Agregat baru mulai menghitung dari nol saat penerapan dijalankan ulang.
Menghapus: kompatibel penuh. Data state dari agregat yang dihapus dibuang; semua agregat lain tidak terpengaruh.
Menambahkan dan menghapus dalam satu perubahan: kompatibel sebagian. Agregat yang ditambahkan dimulai dari nol; state agregat yang dihapus dibuang.
Memodifikasi (mengganti satu agregat dengan agregat lain): kompatibel sebagian. Agregat asli dianggap dihapus dan statenya dibuang; agregat baru dianggap ditambahkan dan dimulai dari nol.
Untuk agregat yang tidak Anda modifikasi, hasil setelah penggunaan kembali state identik dengan hasil yang dihitung dari seluruh data historis.
-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Tambahkan COUNT(c): kompatibel sebagian.
-- SUM(b) dan MAX(c) tidak terpengaruh. COUNT(c) dimulai dari 0.
SELECT a, SUM(b), MAX(c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Hapus SUM(b): kompatibel penuh.
-- MAX(c) tidak terpengaruh.
SELECT a, MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Ubah MAX(c) menjadi MIN(c): kompatibel sebagian.
-- SUM(b) tidak terpengaruh. State MAX(c) dibuang; MIN(c) dimulai dari 0.
SELECT a, SUM(b), MIN(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Mengubah urutan agregat non-distinct
Mengubah urutan agregat dalam daftar SELECT tidak memengaruhi kompatibilitas. Semua hasil tetap benar.
-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Tukar SUM(b) dan MAX(c): kompatibel penuh.
SELECT a, MAX(c), SUM(b)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Mengubah logika perhitungan bidang dalam agregat non-distinct
Mengubah cara bidang input dihitung dianggap sebagai modifikasi agregat. Agregat asli dibuang dan yang baru dimulai dari awal.
-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Ubah MAX(c) menjadi MAX(SUBSTRING(c, 1, 5)): kompatibel sebagian.
-- SUM(b) tidak terpengaruh. State MAX(c) dibuang; MAX(SUBSTRING(c, 1, 5)) dimulai dari 0.
SELECT a, SUM(b), MAX(c)
FROM (SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable)
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Menambahkan atau menghapus bidang atribut jendela
Menambahkan atau menghapus tumble_start, tumble_end, atau bidang atribut jendela serupa lainnya sepenuhnya kompatibel.
-- Pernyataan SQL asli
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Tambahkan window_end: kompatibel penuh.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start,
tumble_end(ts, INTERVAL '1' MINUTE) AS window_end
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Hapus window_start: kompatibel penuh.
SELECT a,
SUM(b),
MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Mengubah urutan kunci GROUP BY — hanya kunci fungsi jendela
Jika hanya posisi kunci fungsi jendela dalam klausa GROUP BY yang berubah dan semua kunci lain tetap dalam urutan yang sama, kompatibilitas tetap terjaga.
-- Pernyataan SQL asli
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- Pindahkan kunci fungsi jendela: kompatibel penuh.
-- Posisi a dan b tidak berubah.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE), b;Modifikasi yang mengganggu kompatibilitas state
Perubahan berikut membuat penerapan tidak kompatibel dengan data state yang ada. Penerapan harus membuang seluruh state dan menghitung ulang hasil dari awal inputnya.
Memodifikasi atribut jendela
Mengubah jenis jendela, ukuran jendela, atau atribut waktu mengganggu kompatibilitas.
-- Pernyataan SQL asli
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Ubah jenis jendela dari TUMBLE menjadi HOP: tidak kompatibel.
SELECT a,
SUM(b),
MAX(c),
hop_start(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, HOP(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE);
-- Ubah ukuran jendela dari 1 menit menjadi 2 menit: tidak kompatibel.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '2' MINUTE);
-- Ubah atribut waktu dari ts (rowtime) menjadi proctime: tidak kompatibel.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(proctime, INTERVAL '1' MINUTE);Menambahkan, menghapus, atau memodifikasi kunci GROUP BY
Kunci GROUP BY mendefinisikan dimensi statistik. Perubahan apa pun pada kunci-kunci ini — termasuk mengubah logika perhitungan bidang yang digunakan sebagai kunci — mengganggu kompatibilitas.
-- Pernyataan SQL asli
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Tambahkan kunci GROUP BY d: tidak kompatibel.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, d, TUMBLE(ts, INTERVAL '1' MINUTE);Menambahkan, menghapus, atau memodifikasi agregat distinct
Perubahan apa pun pada agregat yang menggunakan DISTINCT mengganggu kompatibilitas, bahkan jika semua agregat lain tetap tidak berubah.
-- Pernyataan SQL asli
SELECT a,
SUM(b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Tambahkan COUNT(DISTINCT b): tidak kompatibel.
SELECT a,
SUM(b),
COUNT(DISTINCT b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Menghapus semua agregat
Menghapus seluruh agregat dari pernyataan SQL mengganggu kompatibilitas. Seluruh state dibuang.
-- Pernyataan SQL asli
SELECT a,
SUM(b),
COUNT(DISTINCT b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Hapus semua agregat: tidak kompatibel.
SELECT a,
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Menambahkan atau menghapus early-fire atau late-fire
Perubahan apa pun pada pengaturan emit early-fire atau late-fire mengganggu kompatibilitas.
Mengubah urutan kunci GROUP BY non-jendela
Mengubah urutan kunci GROUP BY selain kunci fungsi jendela mengganggu kompatibilitas.
-- Pernyataan SQL asli
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- Tukar a dan b: tidak kompatibel.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY b, a, TUMBLE(rowtime, INTERVAL '15' MINUTE);Menambahkan agregat ke penerapan tanpa agregat
Jika SQL asli tidak memiliki agregat dan Anda menambahkan satu, kompatibilitas terganggu.
-- Pernyataan SQL asli (tanpa agregat)
SELECT a, b, c
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- Tambahkan COUNT(c): tidak kompatibel.
SELECT a, b, c, COUNT(c)
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);Hanya satu agregat yang ada dan logika perhitungannya berubah
Jika penerapan hanya memiliki satu agregat dan Anda memodifikasi cara perhitungannya, kompatibilitas terganggu.
-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(b), MAX(c)
FROM MyTable
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);
-- Modifikasi satu-satunya agregat yang tersisa: tidak kompatibel.
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);Metrik statistik sebelum dan setelah perubahan sepenuhnya berbeda
Jika tidak ada agregat dari sebelum perubahan yang tersisa setelah perubahan, kompatibilitas terganggu.
-- Pernyataan SQL asli
SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);
-- Ganti dengan kumpulan agregat yang sama sekali berbeda: tidak kompatibel.
SELECT a, MIN(b), AVG(b) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);SQL berisi konfigurasi early-fire atau late-fire
Jika pernyataan SQL mencakup 'table.exec.emit.early-fire.enabled' = 'true' atau 'table.exec.emit.late-fire.enabled' = 'true' baik sebelum maupun setelah modifikasi, kompatibilitas terganggu.
-- Pernyataan SQL asli
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);
-- Tambahkan konfigurasi early-fire atau late-fire: tidak kompatibel.
SET 'table.exec.emit.early-fire.enabled' = 'true';
SET 'table.exec.emit.early-fire.delay' = '500ms';
-- atau
SET 'table.exec.emit.late-fire.enabled' = 'true';
SET 'table.exec.emit.late-fire.delay' = '1s';
SET 'table.exec.emit.allow-lateness' = '5s';
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);SQL berisi UDAF Python
Jika pernyataan SQL mencakup fungsi agregat yang didefinisikan pengguna (UDAF) Python baik sebelum maupun setelah modifikasi, kompatibilitas antara penerapan dan data statenya unknown.
-- SQL dengan UDAF Python (weighted_avg) sebelum atau setelah modifikasi — kompatibilitas tidak diketahui.
SELECT COUNT(DISTINCT b), a, SUM(DISTINCT b), weighted_avg(a, b)
FROM MyTable
GROUP BY a, c;