All Products
Search
Document Center

Realtime Compute for Apache Flink:Additional limitations on SQL modifications

Last Updated:Mar 26, 2026

Flink SQL is declarative: the table planner automatically determines the underlying operator topology and state layout. Any SQL change — even a minor one — can produce a different execution plan, which may prevent the job from restoring state from a checkpoint or savepoint. This topic covers limits for changes other than modifications to queries, source tables, and sink tables.

Compatibility reference

The table below summarizes all covered change types and their outcomes.

Change typeCompatibility outcome
Different Apache Flink versionUnknown
Modified custom connectors or UDF dependenciesUnknown (manual verification required)
More than one item changed before a compatibility checkUnknown
Added a query that introduces new stateIncompatible
Deleted a sink table and modified or deleted a TEMPORARY TABLE statementUnknown
Deleted a sink table without touching any TEMPORARY TABLE statementCompatible

Limits

Apache Flink version must match

The job you restart must use the same Apache Flink version as the job that generated the checkpoint or savepoint. Flink cannot determine compatibility across versions.

Dependencies must be compatible

The dependencies of the restarted job must be compatible with those of the original job. If you modify custom connectors or the dependencies of user-defined functions (UDFs), verify compatibility manually — Flink cannot detect these issues automatically.

Modifying multiple items results in unknown compatibility

If you change more than one item before a compatibility check, Flink cannot determine compatibility. An item is an aggregate function, a sink table, or a WHERE clause that affects stateful computation.

The following examples result in unknown compatibility:

-- Original SQL statements:
CREATE TABLE MyTable (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'datagen'
);

CREATE TABLE MySink (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'print'
);

INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;


-- Example 1: Rename the sink table to MySink2 and change the aggregate function from max(c) to min(c).
-- Result: unknown compatibility (two items changed: sink table + aggregate function)
CREATE TABLE MySink2 (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'print'
);

INSERT INTO MySink2 SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;


-- Example 2: Add a WHERE clause that contains the a > 10 condition, set
-- table.optimizer.state-compatibility.ignore-filter to true, and change
-- the aggregate function from max(c) to min(c).
-- Result: unknown compatibility (two items changed: WHERE clause + aggregate function)
INSERT INTO MySink
SELECT a, sum(b), min(c) FROM (
  SELECT * FROM MyTable WHERE a > 10
) GROUP BY a;

Adding a query that generates new state causes an incompatible change

If you add a query that introduces new stateful computation, the job cannot restore from the existing checkpoint or savepoint.

-- Original SQL statements:
CREATE TABLE MyTable (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'datagen'
);

CREATE TABLE MySink (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'print'
);

INSERT INTO MySink SELECT a, b, c FROM MyTable;

-- Adding a group aggregate query generates new state data — incompatible change.
INSERT INTO MySink SELECT a, sum(b), min(c) FROM MyTable GROUP BY a;

Deleting a sink table with TEMPORARY TABLE changes results in unknown compatibility

If you delete a sink table and also modify or delete the TEMPORARY TABLE statement for that sink table or its corresponding source table, Flink cannot determine compatibility. If you delete a sink table without modifying any TEMPORARY TABLE statement, compatibility is not affected.

-- Original SQL statements

-- Source table 1
CREATE TEMPORARY TABLE MyTable (
  a int,
  b bigint,
  c bigint,
  ts timestamp(3),
  proctime as proctime(),
  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH ('connector' = 'datagen');

-- Source table 2
CREATE TEMPORARY TABLE MyTable2 (
  a int,
  b bigint,
  c bigint,
  ts timestamp(3),
  proctime as proctime(),
  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH ('connector' = 'datagen');

-- Sink table 1
CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector' = 'print');
-- Sink table 2
CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector' = 'print');

-- Query
BEGIN STATEMENT SET;
INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;
INSERT INTO MySink2 SELECT a, b FROM MyTable2 WHERE a > 10;
END;


-- Example 1: Delete MySink2 and modify the TEMPORARY TABLE for MyTable (add column d bigint).
-- Result: unknown compatibility

-- Source table 1 (modified: column d bigint added)
CREATE TEMPORARY TABLE MyTable (
  a int,
  b bigint,
  c bigint,
  d bigint,
  ts timestamp(3),
  proctime as proctime(),
  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH ('connector' = 'datagen');

-- Sink table 1
CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector' = 'print');

-- Query
INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;


-- Example 2: Delete MySink2 without modifying any TEMPORARY TABLE statement.
-- Result: fully compatible

-- Source table 1
CREATE TEMPORARY TABLE MyTable (
  a int,
  b bigint,
  c bigint,
  ts timestamp(3),
  proctime as proctime(),
  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH ('connector' = 'datagen');

-- Source table 2
CREATE TEMPORARY TABLE MyTable2 (
  a int,
  b bigint,
  c bigint,
  ts timestamp(3),
  proctime as proctime(),
  WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
) WITH ('connector' = 'datagen');

-- Sink table 1
CREATE TEMPORARY TABLE MySink (a int, b bigint) WITH ('connector' = 'print');
-- Sink table 2
CREATE TEMPORARY TABLE MySink2 (a int, b bigint) WITH ('connector' = 'print');

-- Query
INSERT INTO MySink SELECT a, sum(b) FROM MyTable GROUP BY a;