All Products
Search
Document Center

Realtime Compute for Apache Flink:Group Aggregate

Last Updated:Mar 26, 2026

State compatibility determines whether a Flink job can resume from existing state data after you modify a group aggregation query. This document describes which modifications are compatible, which are incompatible, and what to do in each case.

How it works

When Flink executes a group aggregation query, it stores intermediate results as state data. That state is keyed by the GROUP BY fields and organized by the aggregated fields. When you modify the query and restart the job with existing state, Flink checks whether the new query can read and continue accumulating that stored state.

Compatibility has three levels:

Level Meaning
Fully compatible The new query reads and reuses all existing state without any data loss.
Partially compatible The new query starts from existing state, but newly added fields begin accumulating from job restart — not from the beginning of the stream. Deleted fields have their state discarded.
Incompatible The new query cannot reuse existing state. Starting from existing state would produce unpredictable results.

Why DISTINCT fields have stricter rules: Flink uses the relative order of DISTINCT aggregate functions as part of the internal state key structure. Changing that order rewrites the key mapping, making existing state unreadable by the new query.

Compatible modifications

Add, delete, or modify non-DISTINCT aggregated fields

Modification Compatibility Behavior
Add an aggregated field Partially compatible The added field starts accumulating from job restart.
Delete an aggregated field Fully compatible The state of the deleted field is discarded.
Add and delete aggregated fields simultaneously Partially compatible The added field starts accumulating from job restart. The deleted field's state is discarded.
Modify an existing aggregated field Partially compatible Treated as delete + add: the original field's state is discarded; the new field starts from job restart.
Unmodified fields are not affected. Their calculation results are the same whether or not you use state data.
-- Original SQL statement:
SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Partially compatible: Add COUNT(c).
-- SUM(b) and MAX(c) are unaffected. COUNT(c) starts from 0 at job restart.
SELECT a, SUM(b), MAX(c), COUNT(c) FROM MyTable GROUP BY a;

-- Fully compatible: Delete SUM(b).
-- MAX(c) is unaffected.
SELECT a, MAX(c) FROM MyTable GROUP BY a;

-- Partially compatible: Replace MAX(c) with MIN(c).
-- SUM(b) is unaffected. MAX(c) state is discarded. MIN(c) starts from job restart.
SELECT a, SUM(b), MIN(c) FROM MyTable GROUP BY a;

Reorder non-DISTINCT aggregated fields

Reordering aggregated fields that do not contain the DISTINCT keyword is fully compatible.

-- Original SQL statement:
SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Fully compatible: Swap the order of SUM(b) and MAX(c).
-- Both fields are unaffected.
SELECT a, MAX(c), SUM(b) FROM MyTable GROUP BY a;

Modify the computation logic of a non-DISTINCT aggregated field

Modifying computation logic — for example, wrapping an argument in a scalar function — is partially compatible. The original field is treated as deleted and the new field starts from job restart.

-- Original SQL statement:
SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Partially compatible: Apply SUBSTRING before MAX.
-- SUM(b) is unaffected. MAX(c) state is discarded. MAX(SUBSTRING(c,1,5)) starts from job restart.
SELECT a, SUM(b), MAX(c) FROM (
  SELECT a, b, SUBSTRING(c, 1, 5) AS c FROM MyTable
) GROUP BY a;

Reorder all aggregate functions while keeping DISTINCT functions in the same relative order

Fully compatible, provided the relative order of DISTINCT aggregate functions does not change.

-- Original SQL statement:
INSERT INTO MySink
SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;

-- Fully compatible: MAX(b) is moved, but SUM(DISTINCT b) still precedes COUNT(DISTINCT b).
INSERT INTO MySink
SELECT a, SUM(DISTINCT b), COUNT(DISTINCT b), MAX(b) FROM MyTable GROUP BY a;

Other fully compatible modifications

  • No aggregated fields before or after the modification — fully compatible.

  • Delete an aggregate function that supports retraction — fully compatible when the deletion follows a retraction operation.

-- Original SQL statement:
SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
    (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
GROUP BY c/2;

-- Fully compatible: Delete MAX(max_b).
SELECT c/2, AVG(avg_a) AS avg_avg_a FROM
    (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
GROUP BY c/2;

Incompatible modifications

The following modifications cannot reuse existing state.

Modify GROUP BY fields

Adding, deleting, or modifying the fields in the GROUP BY clause — or changing their computation logic — changes the state key structure and is incompatible.

-- Original SQL statement:
SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Incompatible: Add d to GROUP BY.
SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a, d;

-- Incompatible: Remove GROUP BY entirely.
SELECT SUM(b), MAX(c) FROM MyTable;

-- Incompatible: Change GROUP BY field from a to d.
SELECT d, SUM(b), MIN(c) FROM MyTable GROUP BY d;

-- Incompatible: Change GROUP BY field from a to a + 1.
SELECT a, SUM(b), MAX(c) FROM (
  SELECT a + 1 AS a, b, c FROM MyTable
) GROUP BY a;

Add, delete, or modify DISTINCT aggregated fields

Any change to fields that use the DISTINCT keyword is incompatible.

-- Original SQL statement:
SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;

-- Incompatible: Add COUNT(DISTINCT b).
SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;

-- Incompatible: Delete SUM(DISTINCT b).
SELECT a, SUM(b), MAX(c), COUNT(DISTINCT c) FROM MyTable GROUP BY a;

-- Incompatible: Change SUM(DISTINCT b) to AVG(DISTINCT b).
SELECT a, SUM(b), MAX(c), AVG(DISTINCT b), COUNT(DISTINCT c) FROM MyTable GROUP BY a;

-- Incompatible: Change computation logic of COUNT(DISTINCT c).
SELECT a, SUM(b), MAX(c), SUM(DISTINCT b), COUNT(DISTINCT c) FROM (
     SELECT a, b, AVG(c) AS c FROM MyTable GROUP BY a, b
) GROUP BY a;

Add an aggregated field in multi-level aggregation

In multi-level aggregation, retraction can occur between aggregation levels. Adding a field in this context produces unpredictable results and is incompatible.

-- Original SQL statement:
SELECT a/2, AVG(b), MIN(c) FROM (
    SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
) GROUP BY a/2;

-- Incompatible: Add COUNT(c) in the outer aggregation.
SELECT a/2, AVG(b), MIN(c), COUNT(c) FROM (
    SELECT a, SUM(b) AS b, MAX(c) AS c FROM MyTable GROUP BY a
) GROUP BY a/2;

Delete all aggregated fields

Deleting every aggregated field discards all state data with no reuse.

-- Original SQL statement:
SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Incompatible: Delete both SUM(b) and MAX(c).
SELECT a FROM MyTable GROUP BY a;

Change the relative order of DISTINCT aggregate functions

-- Original SQL statement:
INSERT INTO MySink
SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b) FROM MyTable GROUP BY a;

-- Incompatible: COUNT(DISTINCT b) now precedes SUM(DISTINCT b).
INSERT INTO MySink
SELECT COUNT(DISTINCT b), a, MAX(b), SUM(DISTINCT b) FROM MyTable GROUP BY a;

Add aggregated fields to a job with no existing aggregated fields

-- Original SQL statement:
INSERT INTO MySink
SELECT a, b FROM MyTable GROUP BY a, b;

-- Incompatible: Add SUM(b).
INSERT INTO MySink
SELECT a, b, SUM(b) FROM MyTable GROUP BY a, b;

Retain only one aggregated field and modify its computation logic

-- Original SQL statement:
INSERT INTO MySink
SELECT a, SUM(b), MAX(b), MAX(c) FROM MyTable GROUP BY a;

-- Incompatible: Keep only MAX(c) and change its input expression.
INSERT INTO MySink
SELECT a, MAX(c) FROM (SELECT a, b, c + 1 AS c, ts FROM MyTable) GROUP BY a;

Modify all aggregated fields

-- Original SQL statement:
INSERT INTO MySink
SELECT a, b, MAX(c) FROM MyTable GROUP BY a, b;

-- Incompatible: Replace MAX(c) with MIN(c) (the only aggregated field).
INSERT INTO MySink
SELECT a, b, MIN(c) FROM MyTable GROUP BY a, b;

Add or modify an aggregate function that supports retraction

-- Original SQL statement:
SELECT c/2, AVG(avg_a) AS avg_avg_a, MAX(max_b) max_max_b FROM
    (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
GROUP BY c/2;

-- Incompatible: Add MIN(max_b), which supports retraction.
SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) min_max_b, MAX(max_b) max_max_b FROM
    (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
GROUP BY c/2;

-- Incompatible: Replace MAX(max_b) with MIN(max_b).
SELECT c/2, AVG(avg_a) AS avg_avg_a, MIN(max_b) max_max_b FROM
    (SELECT c, MAX(b) AS max_b, AVG(a) AS avg_a FROM MyTable GROUP BY c)
GROUP BY c/2;

Modifications with unknown compatibility

If a Python user-defined aggregate function (UDAF) is used before or after the modification, the system cannot determine compatibility.

-- Unknown compatibility: The query uses a Python UDAF (weighted_avg).
SELECT a, MAX(b), SUM(DISTINCT b), COUNT(DISTINCT b), weighted_avg(a, b)
FROM MyTable GROUP BY a, b;