All Products
Search
Document Center

Realtime Compute for Apache Flink:Modify source table

Last Updated:Mar 26, 2026

This topic describes the compatibility between a job and the state data used to start the job after you modify a source table defined in the SQL statements for the job. It explains which modifications are safe, which discard partial state, and which are incompatible.

Compatibility at a glance

Modification Compatibility State impact
Add a field not referenced in the query Full No state loss
Modify the watermark interval Full No state loss
Modify a field used by some (but not all) downstream operators Partial Affected operators lose accumulated state; unaffected operators continue
Modify primary keys Conditional May affect downstream operator state (e.g., upsert key changes)
Modify primary keys or sharding columns in the WITH clause Conditional May make source table state unavailable (e.g., full scan phase state for MySQL)
Modify the changelog mode in the WITH clause Conditional May affect downstream state if event types sent downstream change
Change the connector type None Not compatible
Rename the source table None Not compatible
Modify a field that changes the input schema of a stateful downstream operator None Not compatible

How compatibility is determined

The check method differs based on what you change:

  • Schema changes (DDL): The system evaluates compatibility based on the entire query that references the source table, not just the table definition.

  • Connector configuration changes (WITH clause): The system checks only the connector parameter that changed, not the full query.

Fully compatible modifications

These changes have no impact on state data. Restart the job from state without any action.

  • Add a field that is not referenced in the query. Downstream operators ignore the new field; state is unaffected.

  • Modify the watermark interval. Watermark configuration does not affect operator state.

Example — adding an unused field:

-- Original
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c VARCHAR
) WITH (
  'connector' = 'datagen'
);

SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Adding field d is fully compatible: d is not referenced in the SELECT statement.
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c VARCHAR,
  d INT
) WITH (
  'connector' = 'datagen'
);

SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

Partially compatible modifications

These changes are safe to apply, but some existing state is discarded. Restart the job from state and expect the affected operators to reset.

  • Modify a field that is used by some downstream operators but not others. Operators that depend on the modified field lose their accumulated state. Operators that do not depend on it are unaffected.

Example — renaming a field with a computed column:

-- Original
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c VARCHAR
) WITH (
  'connector' = 'datagen'
);

SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Renaming b to d and redefining b as a computed column is partially compatible.
-- MAX(c) is unaffected and retains its state.
-- The original SUM(b) is treated as deleted: its accumulated state is discarded.
-- The new SUM(b) is treated as added: it starts accumulating from zero when the job restarts.
CREATE TABLE MyTable (
  a INT,
  d BIGINT,
  c VARCHAR,
  b AS d + 1
) WITH (
  'connector' = 'datagen'
);

SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

Incompatible modifications

These changes are incompatible with existing state data. Do not restart the job from state.

Modification Example
Change the connector type datagen to kafka
Rename the source table MyTable to MyTable2
Modify a field that changes the input schema of a stateful downstream operator Add a field to a table used in a JOIN

Example — changing the connector type:

-- Original
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c VARCHAR
) WITH (
  'connector' = 'datagen'
);

SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Incompatible: changing the connector type from datagen to Kafka.
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c VARCHAR
) WITH (
  'connector' = 'kafka',
  ...
);

SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

Example — renaming the source table:

-- Original
CREATE TABLE MyTable (
  a INT,
  b BIGINT,
  c VARCHAR
) WITH (
  'connector' = 'datagen'
);

SELECT a, SUM(b), MAX(c) FROM MyTable GROUP BY a;

-- Incompatible: renaming the table from MyTable to MyTable2.
CREATE TABLE MyTable2 (
  a INT,
  b BIGINT,
  c VARCHAR
) WITH (
  'connector' = 'datagen'
);

SELECT a, SUM(b), MAX(c) FROM MyTable2 GROUP BY a;

Example — adding a field that affects a JOIN:

-- Original
CREATE TABLE MyTable1 (
  a INT,
  b BIGINT
) WITH (
  'connector' = 'datagen'
);

CREATE TABLE MyTable2 (
  c INT,
  d BIGINT
) WITH (
  'connector' = 'datagen'
);

SELECT * FROM MyTable1 JOIN MyTable2 ON c = d;

-- Incompatible: adding field e to MyTable2 changes the input schema of the join operator.
CREATE TABLE MyTable1 (
  a INT,
  b BIGINT
) WITH (
  'connector' = 'datagen'
);

CREATE TABLE MyTable2 (
  c INT,
  d BIGINT,
  e VARCHAR
) WITH (
  'connector' = 'datagen'
);

SELECT * FROM MyTable1 JOIN MyTable2 ON c = d;

Limitations

Some modifications that appear compatible may still affect state under specific conditions:

  • Primary key changes: Modifying primary keys may affect the state compatibility of downstream operators. For example, changing the upsert key can cause compatibility issues.

  • Sharding column changes in the WITH clause: Modifying primary keys or sharding columns in the WITH clause may affect source table state. For example, changing scan.incremental.snapshot.chunk.key-column for the MySQL connector can make state data from the full scan phase unavailable.

  • Changelog mode changes: Modifying the changelog mode in the WITH clause may affect downstream operator state if the event types sent downstream change (for example, DELETE, UPDATE_BEFORE, or UPDATE_AFTER). Common scenarios:

    • Changing the cdcMode parameter for the Hologres connector (controls whether Change Data Capture (CDC) reads binary logs).

    • Changing whether binary logs are read in UPSERT mode for the MongoDB or PostgreSQL connector.