When you use Flink SQL for real-time data processing, out-of-order changelog events can silently corrupt results — records get deleted when they should exist, or updates land in the wrong order. This topic explains why out-of-order events occur, how SinkUpsertMaterializer corrects them, and how to tune or avoid the operator when performance matters.
Key concepts
Changelog and stream types
In relational databases such as MySQL, the binary log (binlog) captures every INSERT, UPDATE, and DELETE operation. Flink SQL uses a similar mechanism called the changelog to track data changes and enable incremental processing across streaming pipelines.
A changelog stream falls into one of two categories:
| Stream type | Event types | Description |
|---|---|---|
| Append-only stream | +I only | Contains only INSERT events. No updates or deletes. Also called a non-update stream. |
| Update stream | +I, +U, -U, -D | Contains update or delete events in addition to inserts. Operators such as group aggregation and deduplication produce this type. |
Not all operators can consume update streams. Over aggregation and interval join operators only accept append-only streams as input.
Changelog event types
Flink SQL uses four event types, based on the RowKind enum in the Apache Flink API:
| Short name | Full name | Semantics |
|---|---|---|
+I |
INSERT | Inserts a new row. |
-U |
UPDATE_BEFORE | Retracts the previous content of an updated row. Always paired with a +U event. |
+U |
UPDATE_AFTER | Contains the new content of an updated row. Always paired with a -U event. |
-D |
DELETE | Deletes a row. |
Flink keeps UPDATE_BEFORE (-U) and UPDATE_AFTER (+U) as separate event types rather than combining them into a composite UPDATE event for two reasons:
-
Uniform structure: Both events share the same row structure, differentiated only by the
RowKindproperty. A composite event type would require heterogeneous structures or special alignment between INSERT and DELETE events. -
Distributed shuffling: In parallel pipelines, join and aggregation operations shuffle data across tasks. Composite UPDATE events would still need to be split into separate events during shuffling to maintain correctness — so keeping them separate from the start simplifies the model.
How out-of-order events occur
Consider this example, which is used throughout this topic to illustrate the problem and the solution:
-- CDC source tables
CREATE TEMPORARY TABLE s1 (
id BIGINT,
level BIGINT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (...);
CREATE TEMPORARY TABLE s2 (
id BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
) WITH (...);
-- Sink table
CREATE TEMPORARY TABLE t1 (
id BIGINT,
level BIGINT,
attr VARCHAR,
PRIMARY KEY(id) NOT ENFORCED
) WITH (...);
-- Join s1 and s2 and write the result to t1
INSERT INTO t1
SELECT s1.*, s2.attr
FROM s1 JOIN s2
ON s1.level = s2.id;
When the (id=1, level=10) record in table s1 is inserted at time t0 and then updated to (id=1, level=20) at time t1, three changelog events are produced:
| Event | Type |
|---|---|
+I (id=1, level=10) |
INSERT |
-U (id=1, level=10) |
UPDATE_BEFORE |
+U (id=1, level=20) |
UPDATE_AFTER |
The primary key of s1 is id, but the JOIN clause shuffles data on the level column. With a Join operator parallelism of 2, these three events may be routed to two different tasks — one handling level=10 and another handling level=20.

Because the events are processed in parallel, the downstream Sink operator can receive them in any of three possible orderings:
| Case 1 (correct order) | Case 2 (out of order) | Case 3 (out of order) |
|---|---|---|
+I (id=1, level=10, attr='a1') |
+U (id=1, level=20, attr='b1') |
+I (id=1, level=10, attr='a1') |
-U (id=1, level=10, attr='a1') |
+I (id=1, level=10, attr='a1') |
+U (id=1, level=20, attr='b1') |
+U (id=1, level=20, attr='b1') |
-U (id=1, level=10, attr='a1') |
-U (id=1, level=10, attr='a1') |
Case 1 processes events in the original sequence — no problem. In Case 2 and Case 3, the sink table has id as its primary key. If the external storage performs an upsert, the record with id=1 ends up deleted, even though the expected final state is (id=1, level=20, attr='b1').
Out-of-order events only occur when Join operator parallelism is greater than 1. A pair of events with the same upsert key are always routed to the same task, which is why only three ordering cases are possible in this scenario.
SinkUpsertMaterializer
How it works
SinkUpsertMaterializer is an intermediate operator that Flink inserts to resolve ordering issues. It was introduced to address FLINK-20374.
To understand why SinkUpsertMaterializer is needed, it helps to understand upsert keys. An upsert key is a column (or set of columns) that preserves the sort order of a unique key through a SQL operation. When upsert keys exist, the downstream operator receives update events in the correct order. When a data shuffling operation breaks unique key ordering — as the JOIN on level does in this example — the upsert key becomes empty.
In this example, rows from s1 are shuffled by level, so the Join output contains rows with the same s1.id value but in arbitrary order. The unique keys are (s1.id), (s1.id, s1.level), and (s1.id, s2.id), but the upsert key is empty. Additionally, the primary key of the sink table (id) does not match the upsert key in the Join output. SinkUpsertMaterializer bridges this gap.
Out-of-order changelog events follow specific rules: for a specific upsert key (or for all columns if the upsert key is empty), ADD events (+I and +U) always occur before the corresponding RETRACT events (-D and -U). A pair of changelog events with the same upsert key are processed by the same task even when data shuffling occurs. These ordering guarantees are what SinkUpsertMaterializer relies on to reconstruct correct results.
The operator works as follows:
-
Maintains a list of
RowDatavalues in state, keyed by the deduced upsert key (or by the entire row if the upsert key is empty). -
On an ADD event (
+Ior+U): adds or updates the row in state. -
On a RETRACT event (
-Uor-D): removes the row from state. -
Generates correct changelog events based on the primary key of the sink table.
The following diagram shows how SinkUpsertMaterializer handles Cases 2 and 3 from the example above:
-
Case 2: When
-U (id=1, level=10, attr='a1')arrives last, SinkUpsertMaterializer removes that row from state and generates an UPDATE event based on the second-to-last row. The final result is(id=1, level=20, attr='b1'). -
Case 3: When
+U (id=1, level=20, attr='b1')arrives, the operator passes it downstream. When-U (id=1, level=10, attr='a1')arrives later, the operator removes the corresponding row from state without emitting an event. The final result is again(id=1, level=20, attr='b1').
For the source code, see SinkUpsertMaterializer (Flink release-1.17).
When SinkUpsertMaterializer is triggered
Flink adds the SinkUpsertMaterializer operator in the following scenarios:
-
The sink table has a primary key but the incoming data does not satisfy the UNIQUE constraint. Common causes include:
-
Defining a primary key on the sink table when the source table has no primary key.
-
Excluding the source primary key column when writing to the sink, or mapping a non-primary-key column of the source to the sink primary key.
-
Reducing the precision of a primary key column through type conversion or group aggregation (for example, casting from BIGINT to INT).
-
Transforming the primary key column, such as concatenating multiple columns into one:
CREATE TABLE students ( student_id BIGINT NOT NULL, student_name STRING NOT NULL, course_id BIGINT NOT NULL, score DOUBLE NOT NULL, PRIMARY KEY(student_id) NOT ENFORCED ) WITH (...); CREATE TABLE performance_report ( student_info STRING NOT NULL PRIMARY KEY NOT ENFORCED, avg_score DOUBLE NOT NULL ) WITH (...); CREATE TEMPORARY VIEW v AS SELECT student_id, student_name, AVG(score) AS avg_score FROM students GROUP BY student_id, student_name; -- The concatenated result no longer satisfies the UNIQUE constraint -- but is used as the primary key of the sink table. INSERT INTO performance_report SELECT CONCAT('id:', student_id, ',name:', student_name) AS student_info, avg_score FROM v;
-
-
A data shuffling operation disrupts the sort order of a unique key before writing to the sink table. This is the scenario in the join example above: the JOIN on
levelshuffles rows from s1, breaking the sort order of theidprimary key. -
The
table.exec.sink.upsert-materializeparameter is set toforce.
Configure SinkUpsertMaterializer
Use the table.exec.sink.upsert-materialize parameter to control when Flink adds the SinkUpsertMaterializer operator:
| Value | Behavior |
|---|---|
auto (default) |
Flink infers whether out-of-order events are possible and adds the operator if necessary. |
none |
Disables the operator entirely. |
force |
Always adds the operator, even when no primary key is defined on the sink table. |
Settingautodoes not guarantee that events are actually out of order. For example, using aGROUPING SETSclause withCOALESCEto convert null values may prevent the SQL planner from determining whether the upsert key matches the sink primary key. In that case, Flink adds SinkUpsertMaterializer as a precaution. If results are correct without the operator, settable.exec.sink.upsert-materializetonone.
For information about supported query operations in Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0 or later, the corresponding runtime operators, and update stream support, see Query execution.
Performance and operational notes
SinkUpsertMaterializer maintains state for every row it processes. This increases state size and adds I/O overhead for state reads and writes, which reduces throughput. Avoid using the operator when possible.
Avoid triggering SinkUpsertMaterializer
-
Make the partition key used for deduplication or group aggregation match the primary key of the sink table.
-
If a single parallelism fits your dataset and you want to avoid out-of-order events, set parallelism to 1 and disable SinkUpsertMaterializer by setting
table.exec.sink.upsert-materializetonone. -
If an operator chain exists between the Sink operator and an upstream stateful operator (such as a deduplication or group aggregation operator), and no data accuracy issues occurred with VVR versions earlier than 6.0, migrate the deployment to VVR 6.0 or later. Set
table.exec.sink.upsert-materializetononeand keep other configurations unchanged. For migration steps, see Upgrade the engine version of deployments.
When you must use SinkUpsertMaterializer
-
Do not write columns produced by non-deterministic functions (such as
CURRENT_TIMESTAMPorNOW()) to the sink table. When the upsert key is unavailable, SinkUpsertMaterializer compares entire rows, and non-deterministic values prevent historical rows from being matched and removed, causing state to grow without bound. -
If the operator state grows large enough to affect performance, increase the deployment parallelism. See Configure resources for a deployment.
Known issues
SinkUpsertMaterializer can cause unbounded state growth in the following situations:
-
No state TTL, TTL is too long, or TTL is too short: Without a configured time-to-live (TTL), state accumulates indefinitely. An excessively short TTL can also cause problems: if the interval between a DELETE event and its corresponding ADD event exceeds the configured TTL, Flink retains the row in state as dirty data (see FLINK-29225) and produces the following log message:
int index = findremoveFirst(values, row); if (index == -1) { LOG.info(STATE_CLEARED_WARN_MSG); return; }Configure TTL based on your business requirements. See Configure a deployment. Realtime Compute for Apache Flink with VVR 8.0.7 or later supports per-operator TTL configuration to reduce resource consumption for large-state deployments. See Configure the parallelism, chaining strategy, and TTL of an operator.
-
Non-deterministic columns with no upsert key: If the update stream arriving at SinkUpsertMaterializer has no deducible upsert key and includes columns from non-deterministic functions, historical rows cannot be matched by value and are never deleted, causing continuous state growth.
What's next
-
Release notes — Engine version mapping between Realtime Compute for Apache Flink and Apache Flink
-
Query execution — Supported query operations and update stream support for VVR 6.0 and later
-
Configure resources for a deployment — Increase parallelism to handle large SinkUpsertMaterializer state