Window Top-N must follow the modification rules of both window table-valued functions (TVFs) and Top-N queries simultaneously. Because both rule sets apply, Window Top-N supports fewer compatible modifications than a standalone window TVF or Top-N query. This topic describes which modifications to a Window Top-N query are compatible with existing job state data, and which require a full state reset before the job can restart.
Why some modifications break state compatibility
When you modify a Window Top-N query, Flink's query optimizer may produce a different execution plan — changing operator topology or the state schema of an intermediate operator. If the new plan no longer maps to the existing savepoint, the job cannot resume from its previous state.
Any modification that changes the window attributes, GROUP BY fields, partition keys, ORDER BY fields, the value of N, or aggregate fields triggers replanning and causes state incompatibility.
Window Top-N syntax structure
Understanding which clause each modification targets helps you apply the compatibility rules below. The structure of a Window Top-N query is:
-- Outer query: select the top-ranked rows
SELECT [column_list]
FROM (
-- Middle layer: assign row numbers using ROW_NUMBER()
SELECT [column_list],
ROW_NUMBER() OVER (
PARTITION BY window_start, window_end [, partition_key...]
ORDER BY col [ASC|DESC] [, col [ASC|DESC]...]
) AS rk
FROM (
-- Inner query: window aggregation using a window TVF
SELECT [agg_fields], window_start, window_end
FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY [group_keys], window_start, window_end
)
)
WHERE rk < N;
The compatibility rules below map to specific clauses in this structure.
Compatible modifications
The following modifications are fully compatible with existing state data. You can make them without resetting the job state.
Add or remove a window attribute field from the query result
Adding or removing window_start or window_end from the outer SELECT list does not affect the internal state structure.
-- Original: selects window_start only
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
-- Compatible: add window_end to the output
SELECT a, b, c, window_start, window_end FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
Include or exclude the ranking position field from the query result
Including or excluding the rk field (the ROW_NUMBER output) in the outer SELECT is fully compatible.
-- Original: rk is not in the output
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
-- Compatible: include rk in the output
SELECT a, b, c, window_start, rk FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
Reorder partition keys in the OVER clause
Changing the order of partition keys in PARTITION BY does not change the partitioning logic or the state structure.
-- Original: PARTITION BY a, b, window_start, window_end
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a, b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_start, window_end
)
) WHERE rk < 3;
-- Compatible: reorder to PARTITION BY b, a, window_start, window_end
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, a, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_start, window_end
)
) WHERE rk < 3;
Incompatible modifications
The following modifications are incompatible with existing state data. After making any of these changes, you must reset the job state before restarting.
| Modification | Details |
|---|---|
| Change a window attribute | Includes window type, window size, or time attribute. For examples, see Modifications that cause full incompatibility. |
| Add, remove, or modify fields in the GROUP BY clause, or change their computation logic | Affects the inner aggregation state. For examples, see Modifications that cause full incompatibility. |
| Add, remove, or modify an aggregate field, or change the input to the Top-N query | Changes the schema of the data fed into the ROW_NUMBER layer. See the example below. |
| Add, remove, or modify partition keys, or change the computation logic of partition key fields | Affects the middle-layer ROW_NUMBER state. For examples, see Incompatible modifications. |
| Change the fields or ordering in the ORDER BY clause | Affects ranking logic and state. For examples, see Incompatible modifications. |
| Change the value of N | N specifies the number of top-ranked results to return. For examples, see Incompatible modifications. |
| Reorder only the window TVF-related fields in the GROUP BY clause | Although the non-window GROUP BY fields stay the same, reordering window_start and window_end changes the window-based ranking result and is incompatible. See the example below. |
| Reorder only the non-window fields in the GROUP BY clause | Reordering non-window fields (for example, swapping a and b) changes the inner aggregation state and is incompatible. See the example below. |
Example: adding an aggregate field
Adding min(d) AS d to the inner aggregation changes the input schema of the Top-N query and breaks state compatibility.
-- Original
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
-- Incompatible: adds min(d) as d, changing the Top-N input
SELECT a, b, c, d, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, min(d) AS d, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, window_start, window_end
)
) WHERE rk < 3;
Example: reordering GROUP BY fields
Both of the following modifications are incompatible, even though no fields are added or removed.
-- Original
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_start, window_end
)
) WHERE rk < 3;
-- Incompatible: reorders window TVF fields (window_end before window_start)
-- This changes the window-based ranking result.
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY a, b, window_end, window_start
)
) WHERE rk < 3;
-- Incompatible: reorders non-window fields (b before a)
SELECT a, b, c, window_start FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, window_start, window_end ORDER BY c) AS rk
FROM (
SELECT a, sum(b) AS b, max(c) AS c, window_start, window_end
FROM TABLE(tumble(TABLE MyTable, DESCRIPTOR(ts), INTERVAL '1' MINUTE))
GROUP BY b, a, window_start, window_end
)
) WHERE rk < 3;