State management directly affects performance, stability, and resource utilization in Realtime Compute for Apache Flink. When state grows beyond available memory, the state backend spills infrequently accessed data to disk. Accessing data on disk is significantly slower than accessing data in memory, causing latency spikes that propagate as backpressure through the pipeline. Left unchecked, this can crash a deployment.
Diagnose state-related backpressure
Backpressure is Apache Flink's primary indicator of performance bottlenecks. In most cases, backpressure occurs because state size keeps growing past the allocated memory. The state backend moves infrequently used state data to disk, and if an operator frequently reads state from disk, data latency increases significantly -- creating a performance bottleneck.
To determine whether backpressure is caused by large state, analyze the running status of the deployment and its operators. For details, see Diagnostic tools.
Tuning methods
Set proper TTL values to control state size
Changing the TTL of a deployment from 0 to a value greater than 0, or vice versa, causes a compatibility issue and throws a StateMigrationException error.
Configure the State Expiration Time parameter on the Deployments page in the development console of Realtime Compute for Apache Flink. For details, see Parameters.

An excessively short TTL may produce incorrect computation results. For example, if data arrives late and the relevant state has expired during an aggregation or join, the result is wrong. An excessively long TTL increases resource consumption and reduces stability. Set the TTL based on data characteristics and business requirements. For example, if daily computations have a maximum drift of 1 hour across days, set the TTL to 25 hours.
Per-stream TTL with JOIN_STATE_TTL (VVR 8.0.1+)
VVR 8.0.1 and later allow the JOIN_STATE_TTL hint to set different TTL values for the left and right streams in a regular join. This reduces unnecessary state storage and improves performance. For syntax details, see Query hints.
SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...The following table shows the impact of using JOIN_STATE_TTL on a real deployment:
| Item | Deployment details | State size |
|---|---|---|
| Before | The left stream has 20 to 50 times the data volume of the right stream. The right stream TTL is set to 10 days (should be 18 days) to maintain performance, compromising data correctness. State size is approximately 5.8 TB. The deployment consumes up to 700 CUs. | ![]() |
| After | JOIN_STATE_TTL sets the left stream TTL to 12 hours and the right stream TTL to 18 days, ensuring data integrity. State size drops to approximately 590 GB -- one-tenth of the original. The deployment consumes 200 to 300 CUs, saving 50% to 70% of resources. | ![]() |
Remove unnecessary stateful operators
This method applies only to optimizer-derived operators. SQL-invoked operators are generally required by the query logic itself.
ChangelogNormalize
This operator is invoked in most scenarios involving an upsert source table, except for temporal joins based on event time. Before using the Upsert Kafka connector or a similar connector, verify that no temporal joins are performed based on event time. Monitor the state-related metrics of the ChangelogNormalize operator at runtime. If the source table has a large number of primary keys, state size grows because the operator maintains keyed state. Frequent primary key updates also increase state access and modification frequency.
Avoid using the Upsert Kafka connector for data synchronization scenarios. Use a data synchronization tool that ensures exactly-once processing instead.
SinkUpsertMaterializer
By default, the table.exec.sink.upsert-materialize parameter is set to auto. This means the system automatically adds the SinkUpsertMaterializer operator to ensure correctness in specific scenarios, such as out-of-order changelog records. The operator's presence does not necessarily mean records are out of order. For example, when grouping data by multiple keys and merging them into a single column, the optimizer cannot accurately derive the upsert key, so it adds this operator as a safeguard.
If the data distribution pattern is well understood and the final result is correct without this operator, set table.exec.sink.upsert-materialize to none to improve performance.
Check whether SinkUpsertMaterializer is active in the development console of Realtime Compute for Apache Flink. When present, it appears chained with the Sink operator in the topology diagram:


If the SinkUpsertMaterializer operator is not used and the computation result is correct, add 'table.exec.sink.upsert-materialize'='none' to prevent unnecessary use. For configuration instructions, see How do I configure parameters for deployment running?.
Ververica Runtime (VVR) 8.0 and later support intelligent analysis of the SQL execution plan to help identify such issues:

Reduce state access frequency with miniBatch
If minute-level latency is acceptable, enable miniBatch to reduce the frequency of state access and updates. For details, see Enable miniBatch to improve throughput.
The following operators support miniBatch in Realtime Compute for Apache Flink:
| Operator name | Description |
|---|---|
| ChangelogNormalize | N/A |
| Deduplicate | Configure table.exec.deduplicate.mini-batch.compact-changes-enable to compact the changelog during deduplication based on event time. |
| GroupAggregate / GlobalGroupAggregate / IncrementalGroupAggregate | N/A |
| RegularJoin | Configure table.exec.stream.join.mini-batch-enabled to enable miniBatch for join operations. This parameter applies to update streams and outer join scenarios. |
Optimize the execution plan
The optimizer selects the state implementation and generates an execution plan based on SQL statements and configuration. The following adjustments can significantly reduce state size.
Use primary keys in regular joins
If the join keys contain primary keys, the system uses a
ValueState<RowData>object to store only the most recent value per join key. This maximizes storage efficiency.If the join keys contain non-primary keys, the system uses a
MapState<RowData, RowData>object to store the most recent record from the source table per primary key for each join key.If no primary key is defined, the system uses a
MapState<RowData, Integer>object to store the entire data record for each join key along with the occurrence count.
Define primary keys in DDL statements and perform regular joins on the primary keys to optimize storage efficiency.
Use ROW_NUMBER for deduplication over append-only streams
Use the ROW_NUMBER function instead of FIRST_VALUE or LAST_VALUE for deduplication. With ROW_NUMBER, the Deduplicate operator stores only the first or most recent record per key, rather than maintaining the full state required by aggregation functions.
Use FILTER syntax for multi-dimensional aggregation
Use the FILTER syntax instead of CASE WHEN for multi-dimensional aggregation -- for example, counting unique visitors on mobile devices, desktop devices, and all devices. The SQL optimizer recognizes different filter arguments on the same key and shares state data across multiple COUNT DISTINCT computations. This reduces the number of state accesses. Tests show that the FILTER syntax can provide twice the performance of the CASE WHEN syntax.
Adjust join order for multi-stream joins
Apache Flink uses binary hash joins to process data streams. When joining multiple streams, state redundancy amplifies with each additional join.

Join smaller streams before larger streams to mitigate the amplification effect from state redundancy and improve processing efficiency.
Minimize disk reads
Reduce disk storage access to improve system performance and optimize memory utilization. For specific techniques, see Minimize disk reads.
Stateful operator reference
Flink SQL relies on Apache Flink's state backend and checkpointing mechanism to ensure the final consistency of computation results. The optimizer selects stateful operators based on your SQL statements and configuration parameters. Understanding which operators maintain state -- and how they clean it up -- is essential for tuning large-state deployments.
Stateful operators fall into two categories:
Optimizer-derived operators -- added automatically by the optimizer based on SQL structure and configuration
SQL-invoked operators -- triggered directly by SQL statement syntax (aggregations, joins, deduplication, windowing)
Optimizer-derived operators
The optimizer may introduce the following stateful operators. All three use TTL (Time-to-live) for state cleanup.
| Operator name | State cleanup mechanism |
|---|---|
| ChangelogNormalize | Time-to-live (TTL) |
| SinkUpsertMaterializer | |
| LookupJoin (*) |
ChangelogNormalize
The ChangelogNormalize operator processes changelogs involving primary keys and ensures efficiency, data consistency, and data accuracy. It is used in two scenarios:
Scenario 1: The source table has primary keys and supports UPSERT operations.
This type of table is known as an upsert source table. It produces a changelog stream containing only UPDATE (INSERT and UPDATE_AFTER) and DELETE operations on the primary keys while maintaining primary key sequence. For example, the Upsert Kafka connector creates an upsert source table. A custom source connector can also support UPSERT operations by overriding the getChangelogMode method:
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.upsert();
}Scenario 2: The 'table.exec.source.cdc-events-duplicate' = 'true' configuration is specified.
At-least-once processing for Change Data Capture (CDC) may generate duplicate changelog records. To achieve exactly-once processing, specify this configuration to remove duplicates. The following diagram illustrates this scenario:
In this example, input data is hash-shuffled based on the primary key defined in the source table's DDL statement. The ChangelogNormalize operator creates a ValueState object to store the most recent record for each primary key. The following figure shows how the operator updates its state and generates output. When the second -U(2, 'Jerry', 77) record arrives, the corresponding value stored in the state is empty. This means the number of ADD changes (+I and +UA) equals the number of RETRACT changes (-D and -UB) for primary key 2. The duplicate record is discarded.
SinkUpsertMaterializer
The SinkUpsertMaterializer operator ensures that data materialization conforms to upsert semantics when the sink table has a primary key. During stream processing, if the uniqueness and sequence of data records are disrupted before they are written to the sink table, the optimizer automatically adds this operator. It maintains state based on the sink table's primary key. For related scenarios, see Handle out-of-order changelog events in Flink SQL.
LookupJoin
When 'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE' is configured for a lookup join and the optimizer identifies a potential non-deterministic update (see How to eliminate the impact of non-deterministic update in streaming), the system attempts to resolve the issue by adding the LookupJoin operator.
Sample scenarios:
The sink table's primary keys partially or completely overlap with the dimension table's keys, and the dimension table is updated through CDC or other tools.
The join involves a non-primary-key field in the dimension table.
In both scenarios, the LookupJoin operator handles dynamic data changes while preserving query accuracy and consistency.
SQL-invoked operators
These operators are triggered directly by SQL statement syntax. They clean up state based on either TTL or watermark progress.
Window-based operators (WindowAggregate, WindowDeduplicate, WindowJoin, WindowTopN) clean their state based on watermark progress. When the watermark timestamp exceeds a window's end time, a built-in timer triggers state cleanup.
| Operator name | Invoke method | State cleanup |
|---|---|---|
| Deduplicate | Use the ROW_NUMBER function, specify a time attribute field in the ORDER BY clause, and query only the first row. The time attribute can be an event time or a processing time. | TTL |
| RegularJoin | Use a JOIN clause in which the equality condition does not involve a time attribute field. | TTL |
| GroupAggregate | Use the GROUP BY clause with an aggregation function (SUM, COUNT, MIN, MAX, FIRST_VALUE, LAST_VALUE) or the DISTINCT keyword. | TTL |
| GlobalGroupAggregate | Enable local-global aggregation. | TTL |
| IncrementalGroupAggregate | Use a two-level group aggregation query and enable local-global aggregation. The GlobalGroupAggregate and LocalGroupAggregate operators merge into the IncrementalGroupAggregate operator. | TTL |
| Rank | Use the ROW_NUMBER function without a time attribute field in the ORDER BY clause. | TTL |
| GlobalRank | Use the ROW_NUMBER function without a time attribute field in the ORDER BY clause, and enable local-global aggregation. | TTL |
| IntervalJoin | Use a JOIN clause with a time attribute condition (event time or processing time). Example: L.time between R.time + X and R.time + Y or R.time between L.time - Y and L.time - X | watermark |
| TemporalJoin | Perform an inner join or left join based on event time. | watermark |
| WindowDeduplicate | Use a window table-valued function (TVF) for deduplication. | watermark |
| WindowAggregate | Use a window TVF for aggregation. | watermark |
| GlobalWindowAggregate | Use a window TVF for aggregation and enable local-global aggregation. | watermark |
| WindowJoin | Use a window TVF for joins. | watermark |
| WindowRank | Use a window TVF for sorting. | watermark |
| GroupWindowAggregate | Use the legacy syntax of window aggregation. | watermark |
References
Performance tuning for large-state deployments -- covers issues caused by large state and the tuning workflow.
Control state size to reduce backpressure using the DataStream API -- flexible state management with the DataStream API.
Diagnose and prevent checkpoint and savepoint timeout -- identify and resolve checkpoint/savepoint timeout issues.
Improve startup and scaling speed -- remove performance bottlenecks during deployment startup and scaling.

