All Products
Search
Document Center

Realtime Compute for Apache Flink:Deduplication

Last Updated:Mar 26, 2026

This topic describes the compatibility between a Flink job and the state data used to start the job after you modify deduplication queries for the job. Deduplication is implemented using ROW_NUMBER() OVER (PARTITION BY ... ORDER BY ...) AS rk with a WHERE rk = 1 filter. When you modify a deduplication query and resume a job, some changes preserve the existing state structure and others do not.

Compatibility rules differ depending on which deduplication mode your query uses — proctime ASC, proctime DESC, or rowtime. Identify your mode before applying the rules below.

Quick reference

Modificationproctime ASCproctime DESCrowtime
Add, remove, or transform a non-partition-key selected fieldCompatibleIncompatible (schema changes)Incompatible (schema changes)
Modify a selected field without changing the output schemaCompatibleCompatibleCompatible
Reorder partition keysCompatibleCompatibleCompatible
Add, remove, or modify partition keysIncompatibleIncompatibleIncompatible
Change the ORDER BY field or directionIncompatibleIncompatibleIncompatible

Compatible modifications

Deduplication based on proctime ASC

Add, remove, or transform any non-partition-key selected field. Changes to output columns do not affect the state structure.

-- Base query
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Fully compatible: Add d to selected fields
SELECT a, b, c, d FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Fully compatible: Remove b from selected fields
SELECT a, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Fully compatible: Transform c to SUBSTRING(c, 1, 5)
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
    FROM (SELECT a, b, SUBSTRING(c,1,5) as c, proctime FROM MyTable))
WHERE rk = 1;

All deduplication modes

Reordering partition keys is fully compatible. The set of keys — and therefore the keyed state structure — remains unchanged.

-- Base query
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a, b, c ORDER BY proctime ASC) as rk
    FROM MyTable)
WHERE rk = 1;

-- Fully compatible: Reorder partition keys
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY c, a, b ORDER BY proctime ASC) as rk
    FROM MyTable)
WHERE rk = 1;

Deduplication based on rowtime or proctime DESC

Modify a selected field without changing the output schema. Changes that do not alter the schema structure are safe.

-- Base query
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
    FROM MyTable)
WHERE rk = 1 AND c > 10;

-- Fully compatible: Remove b without changing the schema
SELECT a, b FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
    FROM MyTable)
WHERE rk = 1 AND c > 10;

Incompatible modifications

Add, remove, or modify partition keys (all modes)

The PARTITION BY clause defines the keys used to look up keyed state. Any change to the partition key set — or to the computation that produces those keys — changes the key space of the existing state, making it unreadable by the new operator.

-- Base query
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Add d as a partition key (incompatible)
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a, d ORDER BY proctime ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Remove a from the partition keys (incompatible)
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (ORDER BY proctime ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Change partition key computation: a to a + 1 (incompatible)
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (ORDER BY proctime ASC) AS rk
    FROM (SELECT a + 1 AS a, b, c, proctime FROM MyTable))
WHERE rk = 1;

Modify the ORDER BY clause (all modes)

Changing the sort field or sort order alters the semantics of which record is kept as the "first" or "last". The existing state was built under the original ordering and cannot be mapped to a different one.

-- Base query
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Change sort field from proctime to ts (incompatible)
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY ts ASC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Change sort order from ASC to DESC (incompatible)
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
    FROM MyTable)
WHERE rk = 1;

Add, remove, or transform fields that change the schema (rowtime or proctime DESC)

In rowtime and proctime DESC modes, any change that alters the output schema — even adding or removing a single field — makes the stored state structurally incompatible with the new operator.

-- Base query
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Add d to selected fields (incompatible — schema changes)
SELECT a, b, c, d FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Remove c from selected fields (incompatible — schema changes)
SELECT a, b FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
    FROM MyTable)
WHERE rk = 1;

-- Transform c to substring(c, 1, 5) (incompatible — schema changes)
SELECT a, b, c FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY a ORDER BY proctime DESC) AS rk
    FROM (select a, b, substring(c, 1, 5) as c, ts from MyTable))
WHERE rk = 1;