All Products
Search
Document Center

Realtime Compute for Apache Flink:Control state size to reduce backpressure in SQL deployments

Last Updated:Dec 19, 2024

State management affects performance, stability, and resource utilization. Improper state management may lead to system crashes. This topic describes how to control the state size to reduce backpressure in SQL deployments.

Stateful operator types

SQL is a domain-specific and declarative language that allows you to perform data operations without the complexity of handling the underlying data processing logic. Flink SQL leverages the state backend and checkpointing mechanism of Apache Flink to ensure the final consistency of computation results. At the implementation level, Flink SQL utilizes an optimizer to select stateful operators based on parameter configurations and SQL statements. To optimize the performance of stateful computation over massive data, you need a basic understanding of the underlying mechanisms.

Stateful operators derived by the optimizer

The following table describes the stateful operators derived by the optimizer.

Operator name

State cleanup mechanism

ChangelogNormalize

Time-to-live (TTL)

SinkUpsertMaterlizer

LookupJoin (*)

ChangelogNormalize

The ChangelogNormalize operator processes changelogs that involve primary keys and ensures efficiency, data consistency, and data accuracy. The operator is used in the following scenarios:

  • The source table has primary keys and supports UPSERT operations.

    In this case, the source table is also referred to as an upsert source table. The table produces a changelog stream that contains only UPDATE (including INSERT and UPDATE_AFTER) and DELETE operations on the primary keys while maintaining the sequence of the primary keys. For example, you can use the Upsert Kafka connector to create an upsert source table. You can also override the getChangelogMode method to create a custom source connector that supports UPSERT operations.

    @Override
    public ChangelogMode getChangelogMode() {
        return ChangelogMode.upsert();
    }
  • The 'table.exec.source.cdc-events-duplicate' = 'true' configuration is specified.

    At-least-once processing for Change Data Capture (CDC) may generate duplicate changelog records. If you require exactly-once processing, specify this configuration to remove duplicate changelog records. Sample scenario:

    image

    In this example, a hash shuffle operation is performed on the input data based on the primary key defined in the DDL statement of the source table. Then, the ChangelogNormalize operator creates a ValueState object to store the most recent record for each primary key. The following figure shows how the operator updates its state and generates the output. As shown in the figure, when the second -U(2, 'Jerry', 77) record arrives, the corresponding value stored in the state is empty. This indicates that the current number of ADD changes (+I and +UA) is equal to that of RETRACT changes (-D and -UB) for the primary key 2. Therefore, this duplicate record is discarded.

    image

SinkUpsertMaterializer

The SinkUpsertMaterializer operator ensures that data materialization conforms to the upsert semantics when the sink table has a primary key. During stream data processing, if the uniqueness and sequence of the data records are disrupted before they are written to the sink table, the optimizer automatically adds this operator. The operator maintains its state based on the primary key of the sink table to ensure that the corresponding constraints are satisfied. For information about other common scenarios, see Handle out-of-order changelog events in Flink SQL.

LookupJoin

If you specify the 'table.optimizer.non-deterministic-update.strategy'='TRY_RESOLVE' configuration for a lookup join and if the optimizer identifies a potential non-deterministic update (see How To Eliminate The Impact Of Non-Deterministic Update In Streaming), the system attempts to resolve the issue by adding the LookupJoin operator.

Sample scenarios: (1) The primary keys of the sink table partially or completely overlap with those of the dimension table and data in the dimension table may be updated by using CDC or other tools. (2) The join operation involves a non-primary-key field in the dimension table. In the preceding scenarios, the LookupJoin operator can efficiently handle dynamic data changes while ensuring the accuracy and consistency of query results.

Stateful operators invoked by SQL statements

This type of stateful operator cleans up state data based on the TTL value or watermark progress. For example, stateful operators used for windowed computation, such as the WindowAggregate, WindowDeduplicate, WindowJoin, and WindowTopN operators, clean their state data based on the watermark progress. If the timestamp contained in a watermark is later than the end time of a window, the built-in timer triggers state cleanup.

Operator name

Invoke method

State cleanup mechanism

Deduplicate

Use the ROW_NUMBER function, specify a time attribute field in the ORDER BY clause, and query only the first row. The time attribute can be an event time or a processing time.

TTL

RegularJoin

Use a JOIN clause in which the equality condition does not involve a time attribute field.

GroupAggregate

Use the GROUP BY clause and apply an aggregation function to the grouped results, such as SUM, COUNT, MIN, MAX, FIRST_VALUE, and LAST_VALUE, or use the DISTINCT keyword.

GlobalGroupAggregate

Enable local-global aggregation.

IncrementalGroupAggregate

Use a two-level group aggregation query and enable local-global aggregation. In this case, the GlobalGroupAggregate operator and the LocalGroupAggregate operator are merged into the IncrementalGroupAggregate operator.

Rank

Use the ROW_NUMBER function and do not specify a time attribute field in the ORDER BY clause.

GlobalRank

Use the ROW_NUMBER function, do not specify a time attribute field in the ORDER BY clause, and enable local-global aggregation.

IntervalJoin

Use a JOIN clause in which the condition involves a time attribute field. The time attribute can be an event time or a processing time. Example:

L.time between R.time + X and R.time + Y 
  -- Or 
R.time between L.time - Y and L.time - X

watermark

TemporalJoin

Perform an inner join or left join based on the event time.

WindowDeduplicate

Use a window table-valued function (TVF) for data deduplication.

WindowAggregate

Use a window TVF for data aggregation.

GlobalWindowAggregate

Use a window TVF for data aggregation and enable local-global aggregation.

WindowJoin

Use a window TVF for joins.

WindowRank

Use a window TVF for data sorting.

GroupWindowAggregate

Use the legacy syntax of window aggregation.

Diagnostic tools

Backpressure is an indicator of performance bottlenecks in Apache Flink. In most cases, backpressure occurs because the state size continues to increase and exceeds the allocated memory size. In this case, the state backend moves infrequently used state data to the disk storage. However, accessing data in the disk storage is significantly slower than accessing data in the memory. If an operator frequently reads state data from the disk, the data latency significantly increases. This results in a performance bottleneck.

To identify whether backpressure is caused by a large state size, you need to thoroughly analyze the running status of the deployment and operators. For information about how to use the diagnostic tools of Realtime Compute for Apache Flink to identify performance issues, see Diagnostic tools.

Tuning methods

Avoid unnecessary use of stateful operators

This tuning method applies only to the stateful operators derived by the optimizer. In most cases, the stateful operators invoked by SQL statements are necessary.

  • ChangelogNormalize

    This operator is invoked in most scenarios that involve an upsert source table, except for temporal joins based on event time. Before you use the Upsert Kafka connector or a similar connector, make sure that no temporal joins are performed based on event time. When the deployment is running, you need to monitor the state-related metrics of the ChangelogNormalize operator. If the source table has a large number of primary keys, the state size increases because the operator maintains its state based on primary keys (also referred to as keyed state). If the primary keys are frequently updated, the state data is frequently accessed and modified. We recommend that you do not use the Upsert Kafka connector to create a source table in scenarios such as data synchronization. We also recommend that you use a data synchronization tool that ensures exactly-once processing.

  • SinkUpsertMaterializer

    By default, the table.exec.sink.upsert-materialize parameter is set to auto. This indicates that the system automatically uses the SinkUpsertMaterializer operator to ensure data correctness in specific scenarios, such as when the changelog records are out of order. Take note that the use of this operator does not necessarily mean that the records are out of order. For example, if you use multiple keys to group data and merge the keys into a single column, the optimizer cannot accurately derive the upsert key. Therefore, this operator is added to ensure data correctness. If you are familiar with the data distribution pattern and if the final result is correct without using this operator, set the table.exec.sink.upsert-materialize parameter to none to optimize performance.

    You can check whether the SinkUpsertMaterializer operator is used in a deployment in the development console of Realtime Compute for Apache Flink. If the operator is used, it is displayed together with the Sink operator in the topology diagram, as shown in the following figure. The two operators form an operator chain. This helps you monitor and evaluate the application of the SinkUpsertMaterializer operator in an intuitive manner and make informed decisions.

    image.png

    image.png

    If the SinkUpsertMaterializer operator is not used and the computation result is correct, we recommend that you add the 'table.exec.sink.upsert-materialize'='none' configuration to prevent the unnecessary use of this operator. For information about how to add the configuration, see How do I configure parameters for deployment running? . To help you identify similar issues, intelligent analysis of the SQL execution plan is supported in Ververica Runtime (VVR) 8.0 and later, as shown in the following figure.

    image.png

Reduce state access frequency: Enable miniBatch

If a minute-level latency is acceptable in your business, you can enable miniBatch to reduce the frequency of state access and updates. For more information, see Enable miniBatch to improve throughput.

The following table describes the stateful operators that support miniBatch in Realtime Compute for Apache Flink.

Operator name

Description

ChangelogNormalize

N/A

Deduplicate

You can configure the table.exec.deduplicate.mini-batch.compact-changes-enable parameter to specify whether to compact the changelog during deduplication based on the event time.

GroupAggregate

GlobalGroupAggregate

IncrementalGroupAggregate

N/A

RegularJoin

You must configure the table.exec.stream.join.mini-batch-enabled parameter to enable miniBatch for join operations. This parameter applies to update streams and outer join scenarios.

Reduce state size: Specify proper TTL values

Note

If you change the TTL of a deployment from 0 to a value greater than 0 or vice versa, a compatibility issue occurs and a StateMigrationException error is thrown.

State size is crucial to performance. To control the state size of a deployment, configure the State Expiration Time parameter based on your business requirements on the Deployments page in the development console of Realtime Compute for Apache Flink. For more information, see Parameters.

image.png

An excessively short TTL may lead to incorrect computation results. For example, if data arrives late and the relevant state data expires during an aggregation or join operation, the result is incorrect. An excessively long TTL increases resource consumption and reduces stability. We recommend that you configure the TTL based on the data characteristics and your business requirements. For example, if your daily data computations have a maximum drift of 1 hour across days, you can set the TTL to 25 hours.

VVR 8.0.1 and later allow you to use the JOIN_STATE_TTL hint to specify different TTL values for the states of the left and right streams in a regular join. This reduces unnecessary state storage and improves performance. For information about how to use the hint, see Query hints.

SELECT /*+ JOIN_STATE_TTL('left_table' = '..', 'right_table' = '..') */ *
FROM left_table [LEFT | RIGHT | INNER] JOIN right_table ON ...

The following table describes the state sizes of a deployment before and after the JOIN_STATE_TTL hint is used.

Item

Deployment details

State size

Before

  • In this regular join, the data volume of the left stream is 20 to 50 times the data volume of the right stream. The data in the right stream should be retained for 18 days, but the TTL of the right stream is set to 10 days to ensure performance. As a result, data correctness is compromised.

  • The state size of the join operation is approximately 5.8 TB.

  • The deployment consumes up to 700 compute units (CUs).

22

After

  • The JOIN_STATE_TTL hint is used to set the TTL of the left stream to 12 hours and the TTL of the right stream to 18 days. This ensures data integrity.

  • The state size of the join operation is reduced to approximately 590 GB, which is one-tenth of the original size.

  • The deployment consumes 200 to 300 CUs, which indicates that 50% to 70% of resources are saved.

23e

Reduce state size: Optimize execution plan

The optimizer selects the state implementation and generates an execution plan based on the SQL statements and specified configurations.

  • Use primary keys to optimize regular joins

    • If the join keys contain primary keys, the system uses a ValueState<RowData> object to store only the most recent value of each join key. This maximizes storage space utilization.

    • If the join keys contain non-primary keys, the system uses a MapState<RowData, RowData> object to store the most recent record from the source table based on the primary key for each join key.

    • If no primary key is defined, the system uses a MapState<RowData, Integer> object to store the entire data record corresponding to each join key and the number of times the data record appears.

    To optimize storage efficiency, we recommend that you define primary keys in the DDL statements of the tables that you want to join and perform a regular join based on the primary keys.

  • Optimize deduplication over append-only streams

    Use the ROW_NUMBER function to replace the FIRST_VALUE or LAST_VALUE function to increase deduplication efficiency. When you use the ROW_NUMBER function to obtain the first or most recent data record, the Deduplicate operator stores only the first or most recent record of the specified keys.

  • Improve aggregation performance

    Use the FILTER syntax to replace the CASE WHEN syntax in multi-dimensional data aggregation, such as when you want to calculate the number of unique visitors on mobile devices, desktop devices, and all devices. The SQL optimizer can recognize the different filter arguments on the same key. This allows the state data to be shared when multiple COUNT DISTINCT values are calculated based on different conditions on the same key. This also reduces the number of times the state data is accessed. Tests show that the FILTER syntax can provide twice the performance of the CASE WHEN syntax.

Reduce state size: Adjust the join order of multiple streams

Apache Flink uses binary hash joins to process data streams. In the following example, the join of streams A and B results in unnecessary storage consumption. The issue becomes more significant as the number of joined streams increases.

image.png

To address this issue, adjust the joining order. For example, you can join a stream that has a smaller amount of data before a stream that has a larger amount of data. This helps mitigate the amplification effect caused by state redundancy and improves the efficiency and performance of data processing.

Minimize disk reads

You can reduce the number of times to access disk storage to improve system performance. You can also optimize memory utilization. For more information, see Minimize disk reads.

References