All Products
Search
Document Center

Realtime Compute for Apache Flink:Modify sink table

Last Updated:Mar 26, 2026

This topic describes how modifying a sink connector in an SQL deployment affects state compatibility.

By default, Flink treats sink connectors as stateless operators and excludes them from compatibility checks. This means most sink changes do not break state compatibility. The key parameter controlling this behavior is table.optimizer.state-compatibility.ignore-sink:

Parameter value Sink classification Effect on compatibility check
true (default) Stateless operator Sink is excluded from the compatibility check
false Stateful operator Sink is included in the compatibility check
Warning

A Full Compatible result when starting a deployment does not guarantee ongoing compatibility after subsequent changes. If out-of-order data exists, Flink generates a stateful SinkMaterializer operator to handle it. A later change — such as modifying the primary key of a source — can alter the upstream upsert key of the sink and break state compatibility, even though the earlier check passed. For more information, see Handle out-of-order changelog events.

Changes that do not affect compatibility

The following changes leave the deployment fully compatible with state data.

Delete one of multiple sink connectors

Remove an INSERT statement that targets one sink while keeping the others. State data tied to deleted aggregate functions is discarded, but the remaining deployment stays compatible.

-- Original SQL statement:
CREATE TABLE MyTable (
  a int,
  b bigint,
  c varchar
);
CREATE TABLE MySink1 (
  a int,
  b bigint,
  c varchar
);
CREATE TABLE MySink2 (
  a int,
  b bigint,
  c varchar
);
INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;

-- Delete the INSERT statement for MySink1. The deployment remains fully compatible with state data.
-- State data for the aggregate functions in the deleted statement is discarded.
INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;

-- Alternatively, delete the INSERT statement for MySink2. The deployment remains fully compatible with state data.
INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

Add a sink connector with a stateless query

Adding a new sink with a stateless query (no aggregation or stateful operations) keeps the deployment fully compatible.

-- Original SQL statement:
CREATE TABLE MyTable (
  a int,
  b bigint,
  c varchar
);

CREATE TABLE MySink1 (
  a int,
  b bigint,
  c varchar
);
INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

-- Add a stateless query. The deployment remains fully compatible with state data.
CREATE TABLE MySink2 (
  a int,
  b bigint,
  c varchar
);
INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
INSERT INTO MySink2 SELECT a, b, c FROM MyTable WHERE a > 10;

Change the table name, connector type, or WITH clause attributes

When table.optimizer.state-compatibility.ignore-sink is set to true (the default), the sink is excluded from compatibility checks. Changing the table name, connector type, or any WITH clause attribute does not affect state compatibility.

-- Original SQL statement:
CREATE TABLE MyTable (
  a int,
  b bigint,
  c varchar
);

CREATE TABLE MySink (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'print'
);
INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

-- Change the table name and connector type. The deployment remains fully compatible with state data.
CREATE TABLE MySink2 (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'kafka',
  ...
);
INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

Changes that cause incompatibility

The following changes make the deployment incompatible with state data.

Add a sink connector with a stateful query

Adding a new sink with a stateful query (such as aggregation with GROUP BY) breaks state compatibility.

-- Original SQL statement:
CREATE TABLE MyTable (
  a int,
  b bigint,
  c varchar
);

CREATE TABLE MySink1 (
  a int,
  b bigint,
  c varchar
);
INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

-- Add a stateful query. The deployment becomes incompatible with state data.
CREATE TABLE MySink2 (
  b bigint,
  a int,
  c varchar
);
INSERT INTO MySink1 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;
INSERT INTO MySink2 SELECT b, sum(a), max(c) FROM MyTable GROUP BY b;

Set table.optimizer.state-compatibility.ignore-sink to false and change the table name or connector type

When table.optimizer.state-compatibility.ignore-sink is set to false, the sink is treated as a stateful operator and is included in the compatibility check. Changing the table name or connector type then breaks state compatibility.

-- Original SQL statement:
CREATE TABLE MyTable (
  a int,
  b bigint,
  c varchar
);

CREATE TABLE MySink (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'print'
);
INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

-- Set ignore-sink to false, then change the table name.
-- The sink is now a stateful operator. The deployment becomes incompatible with state data.
CREATE TABLE MySink2 (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'print'
);
INSERT INTO MySink2 SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

-- Set ignore-sink to false, then change the connector type from print to blackhole.
-- The deployment becomes incompatible with state data.
CREATE TABLE MySink (
  a int,
  b bigint,
  c varchar
) WITH (
  'connector' = 'blackhole',
  ...
);
INSERT INTO MySink SELECT a, sum(b), max(c) FROM MyTable GROUP BY a;

Change table.optimizer.state-compatibility.ignore-sink from true to false

Switching table.optimizer.state-compatibility.ignore-sink from true to false promotes the sink from a stateless operator to a stateful one. The sink is now included in the compatibility check, and the deployment becomes incompatible with state data.

Indirect change: modify the primary key of a source

Even if the compatibility check shows Full Compatible, the deployment may become incompatible after a subsequent change. If out-of-order data is present, Flink generates a stateful SinkMaterializer operator. Changing the primary key of the source alters the upstream upsert key of the sink, which breaks state compatibility.

-- Original query
CREATE TEMPORARY TABLE MyTable (
  a int primary key not enforced,
  b bigint,
  c bigint,
  ts timestamp(3),
  proctime as proctime(),
  watermark for ts AS ts - interval '1' second
) WITH ('connector' = 'datagen');

CREATE TEMPORARY TABLE MySink (a int, b bigint, c bigint primary key not enforced) WITH ('connector' = 'print');

INSERT INTO MySink SELECT a, b, c FROM MyTable;

-- Change the column used as the primary key.
-- The upstream upsert key of the sink changes. The deployment becomes incompatible with state data.
CREATE TEMPORARY TABLE MyTable (
  a int,
  b bigint,
  c bigint primary key not enforced,
  d bigint,
  ts timestamp(3),
  proctime as proctime(),
  watermark for ts AS ts - interval '1' second
) WITH ('connector' = 'datagen');

CREATE TEMPORARY TABLE MySink (a int, b bigint primary key not enforced, c bigint) WITH ('connector' = 'print');

INSERT INTO MySink SELECT a, b, c FROM MyTable;

Changes that cause unknown compatibility

If you delete a sink and also modify or delete the TEMPORARY TABLE statement of a sink or source, the compatibility between the deployment and state data cannot be determined. For more information, see Additional limitations on SQL modifications.