All Products
Search
Document Center

Realtime Compute for Apache Flink:Tune large-state SQL jobs to reduce backpressure

Last Updated:Feb 28, 2026

State management directly affects performance, stability, and resource utilization in Realtime Compute for Apache Flink. When state grows beyond available memory, the state backend spills infrequently accessed data to disk. Accessing data on disk is significantly slower than accessing data in memory, causing latency spikes that propagate as backpressure through the pipeline. Left unchecked, this can crash a deployment.

Diagnose state-related backpressure

Backpressure is Apache Flink's primary indicator of performance bottlenecks. In most cases, backpressure occurs because state size keeps growing past the allocated memory. The state backend moves infrequently used state data to disk, and if an operator frequently reads state from disk, data latency increases significantly -- creating a performance bottleneck.

To determine whether backpressure is caused by large state, analyze the running status of the deployment and its operators. For details, see Diagnostic tools.

Tuning methods

Set proper TTL values to control state size

Note

Changing the TTL of a deployment from 0 to a value greater than 0, or vice versa, causes a compatibility issue and throws a StateMigrationException error.

Configure the State Expiration Time parameter on the Deployments page in the development console of Realtime Compute for Apache Flink. For details, see Parameters.

image.png

An excessively short TTL may produce incorrect computation results. For example, if data arrives late and the relevant state has expired during an aggregation or join, the result is wrong. An excessively long TTL increases resource consumption and reduces stability. Set the TTL based on data characteristics and business requirements. For example, if daily computations have a maximum drift of 1 hour across days, set the TTL to 25 hours.

Per-stream TTL with JOIN_STATE_TTL (VVR 8.0.1+)

VVR 8.0.1 and later allow the JOIN_STATE_TTL hint to set different TTL values for the left and right streams in a regular join. This reduces unnecessary state storage and improves performance. For syntax details, see Query hints.

SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...

The following table shows the impact of using JOIN_STATE_TTL on a real deployment:

ItemDeployment detailsState size
BeforeThe left stream has 20 to 50 times the data volume of the right stream. The right stream TTL is set to 10 days (should be 18 days) to maintain performance, compromising data correctness. State size is approximately 5.8 TB. The deployment consumes up to 700 CUs.22
AfterJOIN_STATE_TTL sets the left stream TTL to 12 hours and the right stream TTL to 18 days, ensuring data integrity. State size drops to approximately 590 GB -- one-tenth of the original. The deployment consumes 200 to 300 CUs, saving 50% to 70% of resources.23e

Remove unnecessary stateful operators

This method applies only to optimizer-derived operators. SQL-invoked operators are generally required by the query logic itself.

ChangelogNormalize

This operator is invoked in most scenarios involving an upsert source table, except for temporal joins based on event time. Before using the Upsert Kafka connector or a similar connector, verify that no temporal joins are performed based on event time. Monitor the state-related metrics of the ChangelogNormalize operator at runtime. If the source table has a large number of primary keys, state size grows because the operator maintains keyed state. Frequent primary key updates also increase state access and modification frequency.

Avoid using the Upsert Kafka connector for data synchronization scenarios. Use a data synchronization tool that ensures exactly-once processing instead.

SinkUpsertMaterializer

By default, the table.exec.sink.upsert-materialize parameter is set to auto. This means the system automatically adds the SinkUpsertMaterializer operator to ensure correctness in specific scenarios, such as out-of-order changelog records. The operator's presence does not necessarily mean records are out of order. For example, when grouping data by multiple keys and merging them into a single column, the optimizer cannot accurately derive the upsert key, so it adds this operator as a safeguard.

If the data distribution pattern is well understood and the final result is correct without this operator, set table.exec.sink.upsert-materialize to none to improve performance.

Check whether SinkUpsertMaterializer is active in the development console of Realtime Compute for Apache Flink. When present, it appears chained with the Sink operator in the topology diagram:

image.pngimage.png

If the SinkUpsertMaterializer operator is not used and the computation result is correct, add 'table.exec.sink.upsert-materialize'='none' to prevent unnecessary use. For configuration instructions, see How do I configure parameters for deployment running?.

Ververica Runtime (VVR) 8.0 and later support intelligent analysis of the SQL execution plan to help identify such issues:

image.png

Reduce state access frequency with miniBatch

If minute-level latency is acceptable, enable miniBatch to reduce the frequency of state access and updates. For details, see Enable miniBatch to improve throughput.

The following operators support miniBatch in Realtime Compute for Apache Flink:

Operator nameDescription
ChangelogNormalizeN/A
DeduplicateConfigure table.exec.deduplicate.mini-batch.compact-changes-enable to compact the changelog during deduplication based on event time.
GroupAggregate / GlobalGroupAggregate / IncrementalGroupAggregateN/A
RegularJoinConfigure table.exec.stream.join.mini-batch-enabled to enable miniBatch for join operations. This parameter applies to update streams and outer join scenarios.

Optimize the execution plan

The optimizer selects the state implementation and generates an execution plan based on SQL statements and configuration. The following adjustments can significantly reduce state size.

Use primary keys in regular joins

  • If the join keys contain primary keys, the system uses a ValueState<RowData> object to store only the most recent value per join key. This maximizes storage efficiency.

  • If the join keys contain non-primary keys, the system uses a MapState<RowData, RowData> object to store the most recent record from the source table per primary key for each join key.

  • If no primary key is defined, the system uses a MapState<RowData, Integer> object to store the entire data record for each join key along with the occurrence count.

Define primary keys in DDL statements and perform regular joins on the primary keys to optimize storage efficiency.

Use ROW_NUMBER for deduplication over append-only streams

Use the ROW_NUMBER function instead of FIRST_VALUE or LAST_VALUE for deduplication. With ROW_NUMBER, the Deduplicate operator stores only the first or most recent record per key, rather than maintaining the full state required by aggregation functions.

Use FILTER syntax for multi-dimensional aggregation

Use the FILTER syntax instead of CASE WHEN for multi-dimensional aggregation -- for example, counting unique visitors on mobile devices, desktop devices, and all devices. The SQL optimizer recognizes different filter arguments on the same key and shares state data across multiple COUNT DISTINCT computations. This reduces the number of state accesses. Tests show that the FILTER syntax can provide twice the performance of the CASE WHEN syntax.

Adjust join order for multi-stream joins

Apache Flink uses binary hash joins to process data streams. When joining multiple streams, state redundancy amplifies with each additional join.

image.png

Join smaller streams before larger streams to mitigate the amplification effect from state redundancy and improve processing efficiency.

Minimize disk reads

Reduce disk storage access to improve system performance and optimize memory utilization. For specific techniques, see Minimize disk reads.

Stateful operator reference

Flink SQL relies on Apache Flink's state backend and checkpointing mechanism to ensure the final consistency of computation results. The optimizer selects stateful operators based on your SQL statements and configuration parameters. Understanding which operators maintain state -- and how they clean it up -- is essential for tuning large-state deployments.

Stateful operators fall into two categories:

  • Optimizer-derived operators -- added automatically by the optimizer based on SQL structure and configuration

  • SQL-invoked operators -- triggered directly by SQL statement syntax (aggregations, joins, deduplication, windowing)

Optimizer-derived operators

The optimizer may introduce the following stateful operators. All three use TTL (Time-to-live) for state cleanup.

Operator nameState cleanup mechanism
ChangelogNormalizeTime-to-live (TTL)
SinkUpsertMaterializer
LookupJoin (*)

ChangelogNormalize

The ChangelogNormalize operator processes changelogs involving primary keys and ensures efficiency, data consistency, and data accuracy. It is used in two scenarios:

Scenario 1: The source table has primary keys and supports UPSERT operations.

This type of table is known as an upsert source table. It produces a changelog stream containing only UPDATE (INSERT and UPDATE_AFTER) and DELETE operations on the primary keys while maintaining primary key sequence. For example, the Upsert Kafka connector creates an upsert source table. A custom source connector can also support UPSERT operations by overriding the getChangelogMode method:

@Override
public ChangelogMode getChangelogMode() {
    return ChangelogMode.upsert();
}

Scenario 2: The 'table.exec.source.cdc-events-duplicate' = 'true' configuration is specified.

At-least-once processing for Change Data Capture (CDC) may generate duplicate changelog records. To achieve exactly-once processing, specify this configuration to remove duplicates. The following diagram illustrates this scenario:

image

In this example, input data is hash-shuffled based on the primary key defined in the source table's DDL statement. The ChangelogNormalize operator creates a ValueState object to store the most recent record for each primary key. The following figure shows how the operator updates its state and generates output. When the second -U(2, 'Jerry', 77) record arrives, the corresponding value stored in the state is empty. This means the number of ADD changes (+I and +UA) equals the number of RETRACT changes (-D and -UB) for primary key 2. The duplicate record is discarded.

image

SinkUpsertMaterializer

The SinkUpsertMaterializer operator ensures that data materialization conforms to upsert semantics when the sink table has a primary key. During stream processing, if the uniqueness and sequence of data records are disrupted before they are written to the sink table, the optimizer automatically adds this operator. It maintains state based on the sink table's primary key. For related scenarios, see Handle out-of-order changelog events in Flink SQL.

LookupJoin

When 'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE' is configured for a lookup join and the optimizer identifies a potential non-deterministic update (see How to eliminate the impact of non-deterministic update in streaming), the system attempts to resolve the issue by adding the LookupJoin operator.

Sample scenarios:

  1. The sink table's primary keys partially or completely overlap with the dimension table's keys, and the dimension table is updated through CDC or other tools.

  2. The join involves a non-primary-key field in the dimension table.

In both scenarios, the LookupJoin operator handles dynamic data changes while preserving query accuracy and consistency.

SQL-invoked operators

These operators are triggered directly by SQL statement syntax. They clean up state based on either TTL or watermark progress.

Window-based operators (WindowAggregate, WindowDeduplicate, WindowJoin, WindowTopN) clean their state based on watermark progress. When the watermark timestamp exceeds a window's end time, a built-in timer triggers state cleanup.

Operator nameInvoke methodState cleanup
DeduplicateUse the ROW_NUMBER function, specify a time attribute field in the ORDER BY clause, and query only the first row. The time attribute can be an event time or a processing time.TTL
RegularJoinUse a JOIN clause in which the equality condition does not involve a time attribute field.TTL
GroupAggregateUse the GROUP BY clause with an aggregation function (SUM, COUNT, MIN, MAX, FIRST_VALUE, LAST_VALUE) or the DISTINCT keyword.TTL
GlobalGroupAggregateEnable local-global aggregation.TTL
IncrementalGroupAggregateUse a two-level group aggregation query and enable local-global aggregation. The GlobalGroupAggregate and LocalGroupAggregate operators merge into the IncrementalGroupAggregate operator.TTL
RankUse the ROW_NUMBER function without a time attribute field in the ORDER BY clause.TTL
GlobalRankUse the ROW_NUMBER function without a time attribute field in the ORDER BY clause, and enable local-global aggregation.TTL
IntervalJoinUse a JOIN clause with a time attribute condition (event time or processing time). Example: L.time between R.time + X and R.time + Y or R.time between L.time - Y and L.time - Xwatermark
TemporalJoinPerform an inner join or left join based on event time.watermark
WindowDeduplicateUse a window table-valued function (TVF) for deduplication.watermark
WindowAggregateUse a window TVF for aggregation.watermark
GlobalWindowAggregateUse a window TVF for aggregation and enable local-global aggregation.watermark
WindowJoinUse a window TVF for joins.watermark
WindowRankUse a window TVF for sorting.watermark
GroupWindowAggregateUse the legacy syntax of window aggregation.watermark

References