State management affects performance, stability, and resource utilization. Improper state management may lead to system crashes. This topic describes how to control the state size to reduce backpressure in SQL deployments.
Stateful operator types
SQL is a domain-specific and declarative language that allows you to perform data operations without the complexity of handling the underlying data processing logic. Flink SQL leverages the state backend and checkpointing mechanism of Apache Flink to ensure the final consistency of computation results. At the implementation level, Flink SQL utilizes an optimizer to select stateful operators based on parameter configurations and SQL statements. To optimize the performance of stateful computation over massive data, you need a basic understanding of the underlying mechanisms.
Stateful operators derived by the optimizer
The following table describes the stateful operators derived by the optimizer.
Operator name | State cleanup mechanism |
ChangelogNormalize | |
SinkUpsertMaterlizer | |
LookupJoin (*) |
ChangelogNormalize
The ChangelogNormalize operator processes changelogs that involve primary keys and ensures efficiency, data consistency, and data accuracy. The operator is used in the following scenarios:
The source table has primary keys and supports UPSERT operations.
In this case, the source table is also referred to as an upsert source table. The table produces a changelog stream that contains only UPDATE (including INSERT and UPDATE_AFTER) and DELETE operations on the primary keys while maintaining the sequence of the primary keys. For example, you can use the Upsert Kafka connector to create an upsert source table. You can also override the getChangelogMode method to create a custom source connector that supports UPSERT operations.
@Override public ChangelogMode getChangelogMode() { return ChangelogMode.upsert(); }
The
'table.exec.source.cdc-events-duplicate' = 'true'
configuration is specified.At-least-once processing for Change Data Capture (CDC) may generate duplicate changelog records. If you require exactly-once processing, specify this configuration to remove duplicate changelog records. Sample scenario:
In this example, a hash shuffle operation is performed on the input data based on the primary key defined in the DDL statement of the source table. Then, the ChangelogNormalize operator creates a ValueState object to store the most recent record for each primary key. The following figure shows how the operator updates its state and generates the output. As shown in the figure, when the second
-U(2, 'Jerry', 77)
record arrives, the corresponding value stored in the state is empty. This indicates that the current number of ADD changes (+I and +UA) is equal to that of RETRACT changes (-D and -UB) for the primary key 2. Therefore, this duplicate record is discarded.
SinkUpsertMaterializer
The SinkUpsertMaterializer operator ensures that data materialization conforms to the upsert semantics when the sink table has a primary key. During stream data processing, if the uniqueness and sequence of the data records are disrupted before they are written to the sink table, the optimizer automatically adds this operator. The operator maintains its state based on the primary key of the sink table to ensure that the corresponding constraints are satisfied. For information about other common scenarios, see Handle out-of-order changelog events in Flink SQL.
LookupJoin
If you specify the 'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE'
configuration for a lookup join and if the optimizer identifies a potential non-deterministic update (see How To Eliminate The Impact Of Non-Deterministic Update In Streaming), the system attempts to resolve the issue by adding the LookupJoin operator.
Sample scenarios: (1) The primary keys of the sink table partially or completely overlap with those of the dimension table and data in the dimension table may be updated by using CDC or other tools. (2) The join operation involves a non-primary-key field in the dimension table. In the preceding scenarios, the LookupJoin operator can efficiently handle dynamic data changes while ensuring the accuracy and consistency of query results.
Stateful operators invoked by SQL statements
This type of stateful operator cleans up state data based on the TTL value or watermark progress. For example, stateful operators used for windowed computation, such as the WindowAggregate, WindowDeduplicate, WindowJoin, and WindowTopN operators, clean their state data based on the watermark progress. If the timestamp contained in a watermark is later than the end time of a window, the built-in timer triggers state cleanup.
Operator name | Invoke method | State cleanup mechanism |
Deduplicate | Use the ROW_NUMBER function, specify a time attribute field in the ORDER BY clause, and query only the first row. The time attribute can be an event time or a processing time. | TTL |
RegularJoin | Use a JOIN clause in which the equality condition does not involve a time attribute field. | |
GroupAggregate | Use the GROUP BY clause and apply an aggregation function to the grouped results, such as SUM, COUNT, MIN, MAX, FIRST_VALUE, and LAST_VALUE, or use the DISTINCT keyword. | |
GlobalGroupAggregate | Enable local-global aggregation. | |
IncrementalGroupAggregate | Use a two-level group aggregation query and enable local-global aggregation. In this case, the GlobalGroupAggregate operator and the LocalGroupAggregate operator are merged into the IncrementalGroupAggregate operator. | |
Rank | Use the ROW_NUMBER function and do not specify a time attribute field in the ORDER BY clause. | |
GlobalRank | Use the ROW_NUMBER function, do not specify a time attribute field in the ORDER BY clause, and enable local-global aggregation. | |
IntervalJoin | Use a JOIN clause in which the condition involves a time attribute field. The time attribute can be an event time or a processing time. Example:
| watermark |
TemporalJoin | Perform an inner join or left join based on the event time. | |
WindowDeduplicate | Use a window table-valued function (TVF) for data deduplication. | |
WindowAggregate | Use a window TVF for data aggregation. | |
GlobalWindowAggregate | Use a window TVF for data aggregation and enable local-global aggregation. | |
WindowJoin | Use a window TVF for joins. | |
WindowRank | Use a window TVF for data sorting. | |
GroupWindowAggregate | Use the legacy syntax of window aggregation. |
Diagnostic tools
Backpressure is an indicator of performance bottlenecks in Apache Flink. In most cases, backpressure occurs because the state size continues to increase and exceeds the allocated memory size. In this case, the state backend moves infrequently used state data to the disk storage. However, accessing data in the disk storage is significantly slower than accessing data in the memory. If an operator frequently reads state data from the disk, the data latency significantly increases. This results in a performance bottleneck.
To identify whether backpressure is caused by a large state size, you need to thoroughly analyze the running status of the deployment and operators. For information about how to use the diagnostic tools of Realtime Compute for Apache Flink to identify performance issues, see Diagnostic tools.
Tuning methods
Avoid unnecessary use of stateful operators
This tuning method applies only to the stateful operators derived by the optimizer. In most cases, the stateful operators invoked by SQL statements are necessary.
ChangelogNormalize
This operator is invoked in most scenarios that involve an upsert source table, except for temporal joins based on event time. Before you use the Upsert Kafka connector or a similar connector, make sure that no temporal joins are performed based on event time. When the deployment is running, you need to monitor the state-related metrics of the ChangelogNormalize operator. If the source table has a large number of primary keys, the state size increases because the operator maintains its state based on primary keys (also referred to as keyed state). If the primary keys are frequently updated, the state data is frequently accessed and modified. We recommend that you do not use the Upsert Kafka connector to create a source table in scenarios such as data synchronization. We also recommend that you use a data synchronization tool that ensures exactly-once processing.
SinkUpsertMaterializer
By default, the
table.exec.sink.upsert-materialize
parameter is set toauto
. This indicates that the system automatically uses the SinkUpsertMaterializer operator to ensure data correctness in specific scenarios, such as when the changelog records are out of order. Take note that the use of this operator does not necessarily mean that the records are out of order. For example, if you use multiple keys to group data and merge the keys into a single column, the optimizer cannot accurately derive the upsert key. Therefore, this operator is added to ensure data correctness. If you are familiar with the data distribution pattern and if the final result is correct without using this operator, set the table.exec.sink.upsert-materialize parameter to none to optimize performance.You can check whether the SinkUpsertMaterializer operator is used in a deployment in the development console of Realtime Compute for Apache Flink. If the operator is used, it is displayed together with the Sink operator in the topology diagram, as shown in the following figure. The two operators form an operator chain. This helps you monitor and evaluate the application of the SinkUpsertMaterializer operator in an intuitive manner and make informed decisions.
If the SinkUpsertMaterializer operator is not used and the computation result is correct, we recommend that you add the
'table.exec.sink.upsert-materialize'='none'
configuration to prevent the unnecessary use of this operator. For information about how to add the configuration, see How do I configure parameters for deployment running? . To help you identify similar issues, intelligent analysis of the SQL execution plan is supported in Ververica Runtime (VVR) 8.0 and later, as shown in the following figure.
Reduce state access frequency: Enable miniBatch
If a minute-level latency is acceptable in your business, you can enable miniBatch to reduce the frequency of state access and updates. For more information, see Enable miniBatch to improve throughput.
The following table describes the stateful operators that support miniBatch in Realtime Compute for Apache Flink.
Operator name | Description |
ChangelogNormalize | N/A |
Deduplicate | You can configure the table.exec.deduplicate.mini-batch.compact-changes-enable parameter to specify whether to compact the changelog during deduplication based on the event time. |
GroupAggregate GlobalGroupAggregate IncrementalGroupAggregate | N/A |
RegularJoin | You must configure the table.exec.stream.join.mini-batch-enabled parameter to enable miniBatch for join operations. This parameter applies to update streams and outer join scenarios. |
Reduce state size: Specify proper TTL values
If you change the TTL of a deployment from 0 to a value greater than 0 or vice versa, a compatibility issue occurs and a StateMigrationException error is thrown.
State size is crucial to performance. To control the state size of a deployment, configure the State Expiration Time parameter based on your business requirements on the Deployments page in the development console of Realtime Compute for Apache Flink. For more information, see Parameters.
An excessively short TTL may lead to incorrect computation results. For example, if data arrives late and the relevant state data expires during an aggregation or join operation, the result is incorrect. An excessively long TTL increases resource consumption and reduces stability. We recommend that you configure the TTL based on the data characteristics and your business requirements. For example, if your daily data computations have a maximum drift of 1 hour across days, you can set the TTL to 25 hours.
VVR 8.0.1 and later allow you to use the JOIN_STATE_TTL hint to specify different TTL values for the states of the left and right streams in a regular join. This reduces unnecessary state storage and improves performance. For information about how to use the hint, see Query hints.
SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...
The following table describes the state sizes of a deployment before and after the JOIN_STATE_TTL hint is used.
Item | Deployment details | State size |
Before |
| |
After |
|
Reduce state size: Optimize execution plan
The optimizer selects the state implementation and generates an execution plan based on the SQL statements and specified configurations.
Use primary keys to optimize regular joins
If the join keys contain primary keys, the system uses a ValueState<RowData> object to store only the most recent value of each join key. This maximizes storage space utilization.
If the join keys contain non-primary keys, the system uses a MapState<RowData, RowData> object to store the most recent record from the source table based on the primary key for each join key.
If no primary key is defined, the system uses a MapState<RowData, Integer> object to store the entire data record corresponding to each join key and the number of times the data record appears.
To optimize storage efficiency, we recommend that you define primary keys in the DDL statements of the tables that you want to join and perform a regular join based on the primary keys.
Optimize deduplication over append-only streams
Use the ROW_NUMBER function to replace the FIRST_VALUE or LAST_VALUE function to increase deduplication efficiency. When you use the ROW_NUMBER function to obtain the first or most recent data record, the Deduplicate operator stores only the first or most recent record of the specified keys.
Improve aggregation performance
Use the FILTER syntax to replace the CASE WHEN syntax in multi-dimensional data aggregation, such as when you want to calculate the number of unique visitors on mobile devices, desktop devices, and all devices. The SQL optimizer can recognize the different filter arguments on the same key. This allows the state data to be shared when multiple COUNT DISTINCT values are calculated based on different conditions on the same key. This also reduces the number of times the state data is accessed. Tests show that the FILTER syntax can provide twice the performance of the CASE WHEN syntax.
Reduce state size: Adjust the join order of multiple streams
Apache Flink uses binary hash joins to process data streams. In the following example, the join of streams A and B results in unnecessary storage consumption. The issue becomes more significant as the number of joined streams increases.
To address this issue, adjust the joining order. For example, you can join a stream that has a smaller amount of data before a stream that has a larger amount of data. This helps mitigate the amplification effect caused by state redundancy and improves the efficiency and performance of data processing.
Minimize disk reads
You can reduce the number of times to access disk storage to improve system performance. You can also optimize memory utilization. For more information, see Minimize disk reads.
References
For information about the issues caused by a large state size and the tuning workflow, see Performance tuning for large-state deployments.
Apache Flink Datastream API allows you to manage the state size in a flexible manner. For more information, see Control state size to reduce backpressure using the Datastream API.
For information about how to diagnose and prevent checkpoint and savepoint timeout, see Diagnose and prevent checkpoint and savepoint timeout
For information about how to identify and remove performance bottlenecks during deployment startup and scaling, see Improve startup and scaling speed.