When you need to modify a Top-N query in a running Flink job, use the following rules to determine whether your change is compatible with the existing state data — and whether you can restart the job without resetting state.
| Modification | State compatible? |
|---|---|
| Reorder partition keys | Yes |
Include or exclude the ranking position field (rk) |
Yes |
| Change the upstream upsert key (when using UpdateFastRank) | No |
| Add, delete, or modify partition keys | No |
Modify the ORDER BY clause |
No |
| Change the value of N | No |
| Add, delete, or modify selected fields | No |
Safe modifications
The following modifications are fully compatible with existing state data.
Reorder partition keys
Changing the sequence of PARTITION BY keys does not affect state compatibility.
-- Original
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Compatible: reorder partition keys
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
Include or exclude the ranking position field
Adding or removing the rk field from the output does not affect state compatibility.
-- Original
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Compatible: include rk in the output
SELECT a, b, c, rk FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
Incompatible modifications
The following modifications are incompatible with existing state data.
Change the upstream upsert key when using UpdateFastRank
If the job uses the UpdateFastRank algorithm for ranking, changing the upstream upsert key breaks state compatibility.
Add, delete, or modify partition keys
Any change to the PARTITION BY clause — including adding or removing keys, or changing the computation logic of a key field — breaks state compatibility.
-- Original
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: add d as a partition key
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a, d ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: remove a from partition keys
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: change partition key from a to a + 1
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM (SELECT a + 1 AS a, b, c FROM MyTable))
WHERE rk < 3;
Modify the ORDER BY clause
Changing the sort field, its computation logic, or the sort order breaks state compatibility.
-- Original
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: change sort field from c to b
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: change sort field to an expression
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM (SELECT a, b, substring(c, 1, 5) AS c FROM MyTable))
WHERE rk < 3;
-- Incompatible: change sort order from ascending to descending
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC) AS rk
FROM MyTable)
WHERE rk < 3;
Change the value of N
N determines how many top-ranked results the query returns. Changing it breaks state compatibility.
-- Original
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: change N from 3 to 5
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 5;
Add, delete, or modify selected fields
Any change to the fields selected by the outer query — including adding or removing a field, or changing a field's computation logic — breaks state compatibility.
-- Original
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: add d to the selected fields
SELECT a, b, c, d FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: remove b from the selected fields
SELECT a, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM MyTable)
WHERE rk < 3;
-- Incompatible: change field b to b + 1
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
FROM (SELECT a, b + 1 AS b, c FROM MyTable))
WHERE rk < 3;