When you modify the SQL statement for a running Realtime Compute for Apache Flink deployment, Flink evaluates whether the existing state data can be reused. Understanding which changes preserve state compatibility helps you avoid unexpected data loss or full reprocessing.
This topic covers state compatibility outcomes for SQL statements that use window aggregate functions with GROUP BY.
The result of a modification is one of three outcomes:
Fully compatible — the deployment continues with all historical state data intact.
Partially compatible — unmodified aggregates continue from historical state; newly added aggregates restart from zero.
Incompatible — the deployment cannot reuse any state data and must restart from scratch.
How Flink evaluates state compatibility
Flink evaluates state reuse based on which components of the SQL statement change:
| Component | Flexibility | Examples |
|---|---|---|
| Non-distinct aggregates | Flexible — partial compatibility possible | SUM(b), MAX(c), COUNT(c) |
| Window attribute fields | Flexible — changes are fully compatible | tumble_start, tumble_end |
| Window function position in GROUP BY | Flexible — reordering the window key is fully compatible | TUMBLE(ts, ...) position in GROUP BY |
| Window definition | Fixed — any change breaks compatibility | Window type, window size, time attribute |
| GROUP BY keys (statistical dimensions) | Fixed — any change breaks compatibility | GROUP BY a, d, ... |
| Distinct aggregates | Fixed — any change breaks compatibility | COUNT(DISTINCT c) |
| Early-fire and late-fire settings | Fixed — any change breaks compatibility | table.exec.emit.early-fire.enabled |
This framework explains why some changes can be partially accommodated (Flink can discard or add individual aggregates without rebuilding the full state), while others require a complete state restart (the fundamental structure of the computation changes).
Quick reference
Use this table to determine the compatibility impact of a specific change before applying it.
| Change | Compatibility |
|---|---|
| Add a non-distinct aggregate | Partial |
| Delete a non-distinct aggregate | Full |
| Add and delete non-distinct aggregates in one change | Partial |
| Modify a non-distinct aggregate | Partial |
| Reorder non-distinct aggregates | Full |
| Change the calculation logic of a field in a non-distinct aggregate | Partial |
Add or delete a window attribute field (tumble_start, tumble_end) | Full |
| Reorder GROUP BY keys — only the window function key position changes | Full |
| No aggregates before or after the change | Full |
| Modify the window type (for example, TUMBLE to HOP) | Incompatible |
| Modify the window size | Incompatible |
Modify the time attribute (rowtime to proctime, or vice versa) | Incompatible |
| Add, delete, or modify a GROUP BY key (statistical dimension) | Incompatible |
| Add, delete, or modify a distinct aggregate | Incompatible |
| Delete all aggregates | Incompatible |
| Add or delete early-fire or late-fire | Incompatible |
| Reorder non-window GROUP BY keys | Incompatible |
| Add an aggregate to a deployment that had none | Incompatible |
| Only one aggregate exists and its calculation logic changes | Incompatible |
| The set of aggregates changes entirely (no overlap) | Incompatible |
SQL contains 'table.exec.emit.early-fire.enabled' = 'true' or 'table.exec.emit.late-fire.enabled' = 'true' before or after the change | Incompatible |
| SQL contains a Python user-defined aggregate function (UDAF) before or after the change | Unknown |
Modifications that preserve or partially preserve state
Add, delete, or modify a non-distinct aggregate
The following changes produce partial or full compatibility:
Add: partially compatible. The new aggregate starts counting from zero when the deployment restarts.
Delete: fully compatible. The state data of the deleted aggregate is discarded; all other aggregates are unaffected.
Add and delete in one change: partially compatible. The added aggregate starts from zero; the deleted aggregate's state is discarded.
Modify (changing one aggregate to another): partially compatible. The original aggregate is treated as deleted and its state is discarded; the new aggregate is treated as added and starts from zero.
For aggregates you do not modify, the results after state reuse are identical to the results computed from full historical data.
-- Original SQL statement
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Add COUNT(c): partially compatible.
-- SUM(b) and MAX(c) are unaffected. COUNT(c) starts from 0.
SELECT a, SUM(b), MAX(c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Delete SUM(b): fully compatible.
-- MAX(c) is unaffected.
SELECT a, MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Change MAX(c) to MIN(c): partially compatible.
-- SUM(b) is unaffected. MAX(c) state is discarded; MIN(c) starts from 0.
SELECT a, SUM(b), MIN(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Reorder non-distinct aggregates
Reordering aggregates in the SELECT list does not affect compatibility. All results remain correct.
-- Original SQL statement
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Swap SUM(b) and MAX(c): fully compatible.
SELECT a, MAX(c), SUM(b)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Change the calculation logic of a field in a non-distinct aggregate
Changing how an input field is computed counts as modifying the aggregate. The original aggregate is discarded and the new one starts fresh.
-- Original SQL statement
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Change MAX(c) to MAX(SUBSTRING(c, 1, 5)): partially compatible.
-- SUM(b) is unaffected. MAX(c) state is discarded; MAX(SUBSTRING(c, 1, 5)) starts from 0.
SELECT a, SUM(b), MAX(c)
FROM (SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable)
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Add or delete a window attribute field
Adding or removing tumble_start, tumble_end, or similar window attribute fields is fully compatible.
-- Original SQL statement
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Add window_end: fully compatible.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start,
tumble_end(ts, INTERVAL '1' MINUTE) AS window_end
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Delete window_start: fully compatible.
SELECT a,
SUM(b),
MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Reorder GROUP BY keys — window function key only
If only the position of the window function key in the GROUP BY clause changes and all other keys remain in the same order, compatibility is preserved.
-- Original SQL statement
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- Move the window function key: fully compatible.
-- The positions of a and b are unchanged.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE), b;Modifications that break state compatibility
The following changes make a deployment incompatible with its existing state data. The deployment must discard all state and recompute results from the beginning of its input.
Modify window attributes
Changing the window type, window size, or time attribute breaks compatibility.
-- Original SQL statement
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Change window type from TUMBLE to HOP: incompatible.
SELECT a,
SUM(b),
MAX(c),
hop_start(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, HOP(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE);
-- Change window size from 1 minute to 2 minutes: incompatible.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '2' MINUTE);
-- Change time attribute from ts (rowtime) to proctime: incompatible.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(proctime, INTERVAL '1' MINUTE);Add, delete, or modify a GROUP BY key
GROUP BY keys define the statistical dimensions. Any change to these keys — including changing the calculation logic of a field used as a key — breaks compatibility.
-- Original SQL statement
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Add GROUP BY key d: incompatible.
SELECT a,
SUM(b),
MAX(c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, d, TUMBLE(ts, INTERVAL '1' MINUTE);Add, delete, or modify a distinct aggregate
Any change to aggregates that use DISTINCT breaks compatibility, even if all other aggregates remain unchanged.
-- Original SQL statement
SELECT a,
SUM(b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Add COUNT(DISTINCT b): incompatible.
SELECT a,
SUM(b),
COUNT(DISTINCT b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Delete all aggregates
Removing every aggregate from the SQL statement breaks compatibility. All state is discarded.
-- Original SQL statement
SELECT a,
SUM(b),
COUNT(DISTINCT b),
MAX(c),
COUNT(DISTINCT c),
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);
-- Delete all aggregates: incompatible.
SELECT a,
tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);Add or delete early-fire or late-fire
Any change to early-fire or late-fire emit settings breaks compatibility.
Reorder non-window GROUP BY keys
Changing the order of GROUP BY keys other than the window function key breaks compatibility.
-- Original SQL statement
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- Swap a and b: incompatible.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY b, a, TUMBLE(rowtime, INTERVAL '15' MINUTE);Add an aggregate to a deployment with no aggregates
If the original SQL has no aggregates and you add one, compatibility is broken.
-- Original SQL statement (no aggregates)
SELECT a, b, c
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);
-- Add COUNT(c): incompatible.
SELECT a, b, c, COUNT(c)
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);Only one aggregate exists and its calculation logic changes
If the deployment has exactly one aggregate and you modify how it is computed, compatibility is broken.
-- Original SQL statement
SELECT a, SUM(b), MAX(b), MAX(c)
FROM MyTable
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);
-- Modify the only remaining aggregate: incompatible.
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);Statistical metrics before and after the change are entirely different
If no aggregate from before the change remains after the change, compatibility is broken.
-- Original SQL statement
SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);
-- Replace with a completely different set of aggregates: incompatible.
SELECT a, MIN(b), AVG(b) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);SQL contains early-fire or late-fire configuration
If the SQL statement includes 'table.exec.emit.early-fire.enabled' = 'true' or 'table.exec.emit.late-fire.enabled' = 'true' either before or after the modification, compatibility is broken.
-- Original SQL statement
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);
-- Add early-fire or late-fire configuration: incompatible.
SET 'table.exec.emit.early-fire.enabled' = 'true';
SET 'table.exec.emit.early-fire.delay' = '500ms';
-- or
SET 'table.exec.emit.late-fire.enabled' = 'true';
SET 'table.exec.emit.late-fire.delay' = '1s';
SET 'table.exec.emit.allow-lateness' = '5s';
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);SQL contains a Python UDAF
If the SQL statement includes a Python user-defined aggregate function (UDAF) either before or after the modification, the compatibility between the deployment and its state data is unknown.
-- SQL with a Python UDAF (weighted_avg) before or after modification — compatibility is unknown.
SELECT COUNT(DISTINCT b), a, SUM(DISTINCT b), weighted_avg(a, b)
FROM MyTable
GROUP BY a, c;