This topic describes the compatibility between a Flink job and the state data used to start the job after you modify deduplication queries for the job. Deduplication is implemented using ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...) AS rk with a WHERE rk = 1 filter. When you modify a deduplication query and resume a job, some changes preserve the existing state structure and others do not.
Compatibility rules differ depending on which deduplication mode your query uses —proctime ASC,proctime DESC, orrowtime. Identify your mode before applying the rules below.
Quick reference
| Modification | proctime ASC | proctime DESC | rowtime |
|---|---|---|---|
| Add, remove, or transform a non-partition-key selected field | Compatible | Incompatible (schema changes) | Incompatible (schema changes) |
| Modify a selected field without changing the output schema | Compatible | Compatible | Compatible |
| Reorder partition keys | Compatible | Compatible | Compatible |
| Add, remove, or modify partition keys | Incompatible | Incompatible | Incompatible |
| Change the ORDER BY field or direction | Incompatible | Incompatible | Incompatible |
Compatible modifications
Deduplication based on proctime ASC
Add, remove, or transform any non-partition-key selected field. Changes to output columns do not affect the state structure.
-- Base query
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Fully compatible: Add d to selected fields
SELECT a, b, c, d FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Fully compatible: Remove b from selected fields
SELECT a, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Fully compatible: Transform c to SUBSTRING(c, 1, 5)
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
FROM (SELECT a, b, SUBSTRING(c,1,5) as c, proctime FROM MyTable))
WHERE rk = 1;All deduplication modes
Reordering partition keys is fully compatible. The set of keys — and therefore the keyed state structure — remains unchanged.
-- Base query
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a, b, c ORDER BY proctime ASC) as rk
FROM MyTable)
WHERE rk = 1;
-- Fully compatible: Reorder partition keys
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY c, a, b ORDER BY proctime ASC) as rk
FROM MyTable)
WHERE rk = 1;Deduplication based on rowtime or proctime DESC
Modify a selected field without changing the output schema. Changes that do not alter the schema structure are safe.
-- Base query
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
FROM MyTable)
WHERE rk = 1 AND c > 10;
-- Fully compatible: Remove b without changing the schema
SELECT a, b FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
FROM MyTable)
WHERE rk = 1 AND c > 10;Incompatible modifications
Add, remove, or modify partition keys (all modes)
The PARTITION BY clause defines the keys used to look up keyed state. Any change to the partition key set — or to the computation that produces those keys — changes the key space of the existing state, making it unreadable by the new operator.
-- Base query
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Add d as a partition key (incompatible)
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a, d ORDER BY proctime ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Remove a from the partition keys (incompatible)
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (ORDER BY proctime ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Change partition key computation: a to a + 1 (incompatible)
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (ORDER BY proctime ASC) AS rk
FROM (SELECT a + 1 AS a, b, c, proctime FROM MyTable))
WHERE rk = 1;Modify the ORDER BY clause (all modes)
Changing the sort field or sort order alters the semantics of which record is kept as the "first" or "last". The existing state was built under the original ordering and cannot be mapped to a different one.
-- Base query
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Change sort field from proctime to ts (incompatible)
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY ts ASC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Change sort order from ASC to DESC (incompatible)
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
FROM MyTable)
WHERE rk = 1;Add, remove, or transform fields that change the schema (rowtime or proctime DESC)
In rowtime and proctime DESC modes, any change that alters the output schema — even adding or removing a single field — makes the stored state structurally incompatible with the new operator.
-- Base query
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Add d to selected fields (incompatible — schema changes)
SELECT a, b, c, d FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Remove c from selected fields (incompatible — schema changes)
SELECT a, b FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
FROM MyTable)
WHERE rk = 1;
-- Transform c to substring(c, 1, 5) (incompatible — schema changes)
SELECT a, b, c FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
FROM (select a, b, substring(c, 1, 5) as c, ts from MyTable))
WHERE rk = 1;