All Products
Search
Document Center

Realtime Compute for Apache Flink:Top-N

Last Updated:Mar 26, 2026

When you need to modify a Top-N query in a running Flink job, use the following rules to determine whether your change is compatible with the existing state data — and whether you can restart the job without resetting state.

Modification State compatible?
Reorder partition keys Yes
Include or exclude the ranking position field (rk) Yes
Change the upstream upsert key (when using UpdateFastRank) No
Add, delete, or modify partition keys No
Modify the ORDER BY clause No
Change the value of N No
Add, delete, or modify selected fields No

Safe modifications

The following modifications are fully compatible with existing state data.

Reorder partition keys

Changing the sequence of PARTITION BY keys does not affect state compatibility.

-- Original
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a, b ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Compatible: reorder partition keys
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

Include or exclude the ranking position field

Adding or removing the rk field from the output does not affect state compatibility.

-- Original
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Compatible: include rk in the output
SELECT a, b, c, rk FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

Incompatible modifications

The following modifications are incompatible with existing state data.

Change the upstream upsert key when using UpdateFastRank

If the job uses the UpdateFastRank algorithm for ranking, changing the upstream upsert key breaks state compatibility.

Add, delete, or modify partition keys

Any change to the PARTITION BY clause — including adding or removing keys, or changing the computation logic of a key field — breaks state compatibility.

-- Original
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: add d as a partition key
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a, d ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: remove a from partition keys
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: change partition key from a to a + 1
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM (SELECT a + 1 AS a, b, c FROM MyTable))
WHERE rk < 3;

Modify the ORDER BY clause

Changing the sort field, its computation logic, or the sort order breaks state compatibility.

-- Original
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: change sort field from c to b
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY b) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: change sort field to an expression
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM (SELECT a, b, substring(c, 1, 5) AS c FROM MyTable))
WHERE rk < 3;

-- Incompatible: change sort order from ascending to descending
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c DESC) AS rk
    FROM MyTable)
WHERE rk < 3;

Change the value of N

N determines how many top-ranked results the query returns. Changing it breaks state compatibility.

-- Original
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: change N from 3 to 5
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 5;

Add, delete, or modify selected fields

Any change to the fields selected by the outer query — including adding or removing a field, or changing a field's computation logic — breaks state compatibility.

-- Original
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: add d to the selected fields
SELECT a, b, c, d FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: remove b from the selected fields
SELECT a, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM MyTable)
WHERE rk < 3;

-- Incompatible: change field b to b + 1
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY c) AS rk
    FROM (SELECT a, b + 1 AS b, c FROM MyTable))
WHERE rk < 3;