All Products
Search
Document Center

Realtime Compute for Apache Flink:Group Window Aggregate

Last Updated:Mar 26, 2026

When you modify the SQL statement for a running Realtime Compute for Apache Flink deployment, Flink evaluates whether the existing state data can be reused. Understanding which changes preserve state compatibility helps you avoid unexpected data loss or full reprocessing.

This topic covers state compatibility outcomes for SQL statements that use window aggregate functions with GROUP BY.

The result of a modification is one of three outcomes:

  • Fully compatible — the deployment continues with all historical state data intact.

  • Partially compatible — unmodified aggregates continue from historical state; newly added aggregates restart from zero.

  • Incompatible — the deployment cannot reuse any state data and must restart from scratch.

How Flink evaluates state compatibility

Flink evaluates state reuse based on which components of the SQL statement change:

ComponentFlexibilityExamples
Non-distinct aggregatesFlexible — partial compatibility possibleSUM(b), MAX(c), COUNT(c)
Window attribute fieldsFlexible — changes are fully compatibletumble_start, tumble_end
Window function position in GROUP BYFlexible — reordering the window key is fully compatibleTUMBLE(ts, ...) position in GROUP BY
Window definitionFixed — any change breaks compatibilityWindow type, window size, time attribute
GROUP BY keys (statistical dimensions)Fixed — any change breaks compatibilityGROUP BY a, d, ...
Distinct aggregatesFixed — any change breaks compatibilityCOUNT(DISTINCT c)
Early-fire and late-fire settingsFixed — any change breaks compatibilitytable.exec.emit.early-fire.enabled

This framework explains why some changes can be partially accommodated (Flink can discard or add individual aggregates without rebuilding the full state), while others require a complete state restart (the fundamental structure of the computation changes).

Quick reference

Use this table to determine the compatibility impact of a specific change before applying it.

ChangeCompatibility
Add a non-distinct aggregatePartial
Delete a non-distinct aggregateFull
Add and delete non-distinct aggregates in one changePartial
Modify a non-distinct aggregatePartial
Reorder non-distinct aggregatesFull
Change the calculation logic of a field in a non-distinct aggregatePartial
Add or delete a window attribute field (tumble_start, tumble_end)Full
Reorder GROUP BY keys — only the window function key position changesFull
No aggregates before or after the changeFull
Modify the window type (for example, TUMBLE to HOP)Incompatible
Modify the window sizeIncompatible
Modify the time attribute (rowtime to proctime, or vice versa)Incompatible
Add, delete, or modify a GROUP BY key (statistical dimension)Incompatible
Add, delete, or modify a distinct aggregateIncompatible
Delete all aggregatesIncompatible
Add or delete early-fire or late-fireIncompatible
Reorder non-window GROUP BY keysIncompatible
Add an aggregate to a deployment that had noneIncompatible
Only one aggregate exists and its calculation logic changesIncompatible
The set of aggregates changes entirely (no overlap)Incompatible
SQL contains 'table.exec.emit.early-fire.enabled' = 'true' or 'table.exec.emit.late-fire.enabled' = 'true' before or after the changeIncompatible
SQL contains a Python user-defined aggregate function (UDAF) before or after the changeUnknown

Modifications that preserve or partially preserve state

Add, delete, or modify a non-distinct aggregate

The following changes produce partial or full compatibility:

  • Add: partially compatible. The new aggregate starts counting from zero when the deployment restarts.

  • Delete: fully compatible. The state data of the deleted aggregate is discarded; all other aggregates are unaffected.

  • Add and delete in one change: partially compatible. The added aggregate starts from zero; the deleted aggregate's state is discarded.

  • Modify (changing one aggregate to another): partially compatible. The original aggregate is treated as deleted and its state is discarded; the new aggregate is treated as added and starts from zero.

For aggregates you do not modify, the results after state reuse are identical to the results computed from full historical data.
-- Original SQL statement
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Add COUNT(c): partially compatible.
-- SUM(b) and MAX(c) are unaffected. COUNT(c) starts from 0.
SELECT a, SUM(b), MAX(c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Delete SUM(b): fully compatible.
-- MAX(c) is unaffected.
SELECT a, MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Change MAX(c) to MIN(c): partially compatible.
-- SUM(b) is unaffected. MAX(c) state is discarded; MIN(c) starts from 0.
SELECT a, SUM(b), MIN(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Reorder non-distinct aggregates

Reordering aggregates in the SELECT list does not affect compatibility. All results remain correct.

-- Original SQL statement
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Swap SUM(b) and MAX(c): fully compatible.
SELECT a, MAX(c), SUM(b)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Change the calculation logic of a field in a non-distinct aggregate

Changing how an input field is computed counts as modifying the aggregate. The original aggregate is discarded and the new one starts fresh.

-- Original SQL statement
SELECT a, SUM(b), MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Change MAX(c) to MAX(SUBSTRING(c, 1, 5)): partially compatible.
-- SUM(b) is unaffected. MAX(c) state is discarded; MAX(SUBSTRING(c, 1, 5)) starts from 0.
SELECT a, SUM(b), MAX(c)
FROM (SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable)
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Add or delete a window attribute field

Adding or removing tumble_start, tumble_end, or similar window attribute fields is fully compatible.

-- Original SQL statement
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Add window_end: fully compatible.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start,
  tumble_end(ts, INTERVAL '1' MINUTE) AS window_end
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Delete window_start: fully compatible.
SELECT a,
  SUM(b),
  MAX(c)
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Reorder GROUP BY keys — window function key only

If only the position of the window function key in the GROUP BY clause changes and all other keys remain in the same order, compatibility is preserved.

-- Original SQL statement
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);

-- Move the window function key: fully compatible.
-- The positions of a and b are unchanged.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, TUMBLE(rowtime, INTERVAL '15' MINUTE), b;

Modifications that break state compatibility

The following changes make a deployment incompatible with its existing state data. The deployment must discard all state and recompute results from the beginning of its input.

Modify window attributes

Changing the window type, window size, or time attribute breaks compatibility.

-- Original SQL statement
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Change window type from TUMBLE to HOP: incompatible.
SELECT a,
  SUM(b),
  MAX(c),
  hop_start(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, HOP(ts, INTERVAL '1' MINUTE, INTERVAL '2' MINUTE);

-- Change window size from 1 minute to 2 minutes: incompatible.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '2' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '2' MINUTE);

-- Change time attribute from ts (rowtime) to proctime: incompatible.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(proctime, INTERVAL '1' MINUTE);

Add, delete, or modify a GROUP BY key

GROUP BY keys define the statistical dimensions. Any change to these keys — including changing the calculation logic of a field used as a key — breaks compatibility.

-- Original SQL statement
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Add GROUP BY key d: incompatible.
SELECT a,
  SUM(b),
  MAX(c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, d, TUMBLE(ts, INTERVAL '1' MINUTE);

Add, delete, or modify a distinct aggregate

Any change to aggregates that use DISTINCT breaks compatibility, even if all other aggregates remain unchanged.

-- Original SQL statement
SELECT a,
  SUM(b),
  MAX(c),
  COUNT(DISTINCT c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Add COUNT(DISTINCT b): incompatible.
SELECT a,
  SUM(b),
  COUNT(DISTINCT b),
  MAX(c),
  COUNT(DISTINCT c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Delete all aggregates

Removing every aggregate from the SQL statement breaks compatibility. All state is discarded.

-- Original SQL statement
SELECT a,
  SUM(b),
  COUNT(DISTINCT b),
  MAX(c),
  COUNT(DISTINCT c),
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

-- Delete all aggregates: incompatible.
SELECT a,
  tumble_start(ts, INTERVAL '1' MINUTE) AS window_start
FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' MINUTE);

Add or delete early-fire or late-fire

Any change to early-fire or late-fire emit settings breaks compatibility.

Reorder non-window GROUP BY keys

Changing the order of GROUP BY keys other than the window function key breaks compatibility.

-- Original SQL statement
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY a, b, TUMBLE(rowtime, INTERVAL '15' MINUTE);

-- Swap a and b: incompatible.
SELECT a, SUM(DISTINCT b), MAX(DISTINCT c), COUNT(c)
FROM MyTable
GROUP BY b, a, TUMBLE(rowtime, INTERVAL '15' MINUTE);

Add an aggregate to a deployment with no aggregates

If the original SQL has no aggregates and you add one, compatibility is broken.

-- Original SQL statement (no aggregates)
SELECT a, b, c
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);

-- Add COUNT(c): incompatible.
SELECT a, b, c, COUNT(c)
FROM MyTable
GROUP BY a, b, c, TUMBLE(rowtime, INTERVAL '15' MINUTE);

Only one aggregate exists and its calculation logic changes

If the deployment has exactly one aggregate and you modify how it is computed, compatibility is broken.

-- Original SQL statement
SELECT a, SUM(b), MAX(b), MAX(c)
FROM MyTable
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

-- Modify the only remaining aggregate: incompatible.
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

Statistical metrics before and after the change are entirely different

If no aggregate from before the change remains after the change, compatibility is broken.

-- Original SQL statement
SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);

-- Replace with a completely different set of aggregates: incompatible.
SELECT a, MIN(b), AVG(b) FROM MyTable
GROUP BY a, TUMBLE(ts, INTERVAL '1' SECOND);

SQL contains early-fire or late-fire configuration

If the SQL statement includes 'table.exec.emit.early-fire.enabled' = 'true' or 'table.exec.emit.late-fire.enabled' = 'true' either before or after the modification, compatibility is broken.

-- Original SQL statement
SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

-- Add early-fire or late-fire configuration: incompatible.
SET 'table.exec.emit.early-fire.enabled' = 'true';
SET 'table.exec.emit.early-fire.delay' = '500ms';
-- or
SET 'table.exec.emit.late-fire.enabled' = 'true';
SET 'table.exec.emit.late-fire.delay' = '1s';
SET 'table.exec.emit.allow-lateness' = '5s';

SELECT a, MAX(c)
FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable)
GROUP BY b, a, TUMBLE(ts, INTERVAL '15' MINUTE);

SQL contains a Python UDAF

If the SQL statement includes a Python user-defined aggregate function (UDAF) either before or after the modification, the compatibility between the deployment and its state data is unknown.

-- SQL with a Python UDAF (weighted_avg) before or after modification — compatibility is unknown.
SELECT COUNT(DISTINCT b), a, SUM(DISTINCT b), weighted_avg(a, b)
FROM MyTable
GROUP BY a, c;