Improve throughput and reduce latency of your Realtime Compute for Apache Flink SQL deployments by tuning resource configurations, enabling aggregation optimizations, and rewriting SQL patterns.
Set baseline parameters for throughput
Add the following parameters to the Other Configuration field in the Parameters section of the Configuration tab. These settings improve throughput and reduce data hotspot issues. For details, see Configure custom deployment parameters.
execution.checkpointing.interval: 180s
table.exec.state.ttl: 129600000
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.optimizer.distinct-agg.split.enabled: true| Parameter | Description |
|---|---|
execution.checkpointing.interval | Checkpoint interval. The value 180s means 180 seconds. |
state.backend | State backend type. |
table.exec.state.ttl | Time-to-live (TTL) of state data, in milliseconds. |
table.exec.mini-batch.enabled | Whether to enable the miniBatch feature. |
table.exec.mini-batch.allow-latency | Maximum latency before a mini-batch is triggered. |
table.exec.mini-batch.size | Maximum number of records cached per mini-batch. VVR already optimizes this parameter, so leave it unconfigured in most cases. For details, see Key parameters. |
table.optimizer.distinct-agg.split.enabled | Whether to enable PartialFinal optimization for COUNT DISTINCT. |
Scale deployment resources
Ververica Platform (VVP) caps the CPU cores available to each JobManager and TaskManager at the values you configure. Increase these values when deployments are resource-constrained.
Scale JobManager resources
If many deployments run in parallel, increase JobManager resources in the Resources section of the Configuration tab. For example:
Job Manager CPU: 4
Job Manager Memory: 8 GiB
Scale TaskManager resources
If the deployment topology is complex, increase TaskManager resources in the Resources section of the Configuration tab. For example:
Task Manager CPU: 2
Task Manager Memory: 4 GiB
Keep taskmanager.numberOfTaskSlots at its default value of 1.
Optimize group aggregation
By default, the group aggregation operator processes input records one by one: read the accumulator from state, update it, write it back, then repeat for the next record. This per-record pattern increases state backend overhead -- especially with RocksDB -- and worsens under data hotspot conditions. The following optimizations address these bottlenecks.
Enable miniBatch
MiniBatch buffers incoming records and processes them together, reducing the number of state accesses per batch. This improves throughput at the cost of slightly higher latency. The miniBatch feature triggers micro-batch processing based on event messages inserted at the source at a specified interval.
When to use: Data aggregation scenarios where ultra-low latency is not required.
How to enable: Add the following to the Other Configuration field in the Parameters section of the Configuration tab. For details, see Configure custom deployment parameters.
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s| Parameter | Description |
|---|---|
table.exec.mini-batch.enabled | Set to true to enable miniBatch. |
table.exec.mini-batch.allow-latency | Maximum latency before a mini-batch is triggered. |
table.exec.mini-batch.size | Maximum records per mini-batch. Leave unconfigured -- VVR already optimizes this parameter. For details, see Key parameters. |
Enable LocalGlobal
LocalGlobal splits the aggregation into two phases -- local aggregation at each upstream node, followed by global aggregation -- similar to the combine and reduce phases in MapReduce. Local aggregation pre-filters hotspot data, reducing the volume that reaches the global aggregation phase.
When to use: Common aggregate functions such as SUM, COUNT, MAX, MIN, and AVG with data hotspot issues.
Prerequisites:
MiniBatch must be enabled. LocalGlobal uses the mini-batch interval to determine how many records to accumulate before local aggregation.
The aggregate function must implement the
mergemethod (AggregateFunction).
Status: Enabled by default when miniBatch is active.
Verify: Check the final topology for a GlobalGroupAggregate or LocalGroupAggregate node.
Enable PartialFinal for COUNT DISTINCT
LocalGlobal is less effective for COUNT DISTINCT because local aggregation cannot remove duplicate distinct keys -- duplicates still accumulate in the global phase. PartialFinal addresses this by automatically splitting the aggregation into two layers: the first layer scatters data by the distinct key hash, and the second layer performs the final aggregation.
When to use: COUNT DISTINCT queries with large data volumes where aggregation performance is a bottleneck.
How to enable: Add the following to the Other Configuration field. For details, see Configure custom deployment parameters.
table.optimizer.distinct-agg.split.enabled: trueLimitations:
PartialFinal cannot be used in SQL that contains user-defined aggregate functions (UDAFs).
Enable only for large data volumes. PartialFinal introduces an extra network shuffle because it splits aggregation into two phases.
Verify: Check whether one-layer aggregation changes to two-layer aggregation in the final topology.
Use AGG WITH FILTER instead of CASE WHEN
When computing COUNT DISTINCT on the same field under different conditions, use the SQL FILTER clause instead of CASE WHEN. The Flink SQL optimizer recognizes different filter arguments on the same distinct key and uses a single shared state instance instead of separate state instances for each computation. This reduces state access and state size, doubling deployment performance compared to AGG WITH CASE WHEN in performance tests.
Before (less efficient):
COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT CASE WHEN is_wireless = 'y' THEN visitor_id ELSE NULL END) AS uv_wirelessAfter (2x performance):
COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT visitor_id) FILTER (WHERE is_wireless = 'y') AS uv_wirelessOptimize joins
Key-value separation for two-stream joins
In VVR 6.0.1 and later, the Realtime Compute for Apache Flink engine automatically infers whether to enable key-value separation for JOIN operators on two data streams. After key-value separation is enabled, typical join performance improves by more than 40%.
Configure the table.exec.join.kv-separate parameter to control this behavior:
| Value | Behavior |
|---|---|
AUTO | The engine enables key-value separation automatically based on the join operator state. This is the default. |
FORCE | Force-enable key-value separation. |
NONE | Force-disable key-value separation. |
Key-value separation only takes effect on GeminiStateBackend.
Enable miniBatch for joins
By default, the regular join operator processes records one by one: look up associated records from the counterpart state based on the join key, update state with the current record, then output join results. This per-record pattern increases state backend overhead -- especially with RocksDB -- and can lead to severe record amplification in cascading join scenarios.
MiniBatch for joins addresses this with two core optimizations:
Record folding: Folds records in the buffer to reduce the data volume before the join process.
Output suppression: Suppresses redundant intermediate results during batch processing.
MiniBatch triggers micro-batch processing when one of the following conditions is met: the configured maximum allowed latency is reached, the cached batch size is reached, or another event such as a checkpoint occurs.
VVR version: 8.0.4 or later.
How to enable: Add the following to the Other Configuration field. For details, see Configure custom deployment parameters.
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.stream.join.mini-batch-enabled: trueBest for: Cascading outer joins where message amplification is a concern. Upstream operators use a message collapse or merge mechanism to suppress the message amplification effect on downstream operators. For example:
SELECT a.id AS a_id, a.a_content, B.id AS b_id, B.b_content
FROM a LEFT JOIN
(SELECT * FROM b
LEFT JOIN c ON b.prd_id = c.id) B
ON a.id = B.idOptimize TopN queries
TopN algorithms
The algorithm used depends on whether the input data stream is static or dynamic:
| Input type | Available algorithms | Notes |
|---|---|---|
| Static data streams (e.g., from Simple Log Service) | AppendRank | Only option for static streams. |
| Dynamic data streams (e.g., from aggregate or join) | UpdateFastRank, RetractRank | UpdateFastRank is optimal. RetractRank is the fallback. |
The algorithm name appears in the topology node name.
Switch from RetractRank to UpdateFastRank
UpdateFastRank requires all of the following conditions:
The input stream does not contain DELETE or UPDATE_BEFORE messages. Verify with: For details on the EXPLAIN syntax, see EXPLAIN.
EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set>The input stream contains primary key information (e.g., columns used in a GROUP BY clause).
The ORDER BY fields are monotonically updated in the opposite order of sorting. For example, ORDER BY COUNT DESC, ORDER BY COUNT_DISTINCT DESC, or ORDER BY SUM(positive value) DESC.
Example: For ORDER BY SUM DESC, filter for positive values to guarantee monotonicity:
INSERT INTO print_test
SELECT
cate_id,
seller_id,
stat_date,
pay_ord_amt
FROM (
SELECT
*,
ROW_NUMBER() OVER (
-- The PARTITION BY columns must appear in the subquery's GROUP BY.
-- Include a time field to prevent disorder when state expires.
PARTITION BY cate_id, stat_date
ORDER BY pay_ord_amt DESC
) AS rownum
FROM (
SELECT
cate_id,
seller_id,
stat_date,
-- SUM is monotonically increasing because only positive values are included.
SUM(total_fee) FILTER (WHERE total_fee >= 0) AS pay_ord_amt
FROM random_test
WHERE total_fee >= 0
GROUP BY seller_id, stat_date, cate_id
) a
)
WHERE rownum <= 100;In this example, the random_test table contains static data streams. The aggregation result does not contain DELETE or UPDATE_BEFORE messages, so monotonicity is preserved.
Reduce output volume
Do not include rownum in the final SELECT output. Sort results in the frontend instead. This significantly reduces the volume of data written to the sink. For details, see Top-N.
Increase the TopN cache size
TopN maintains a state cache to reduce disk reads. Calculate the cache hit ratio with this formula:
cache_hit = cache_size * parallelism / top_n / partition_key_numExample: With Top100, a default cache size of 10,000 records, parallelism of 50, and 100,000 partition keys:
10,000 * 50 / 100 / 100,000 = 5% hit ratioA 5% hit ratio means most reads go to disk, causing unstable state seek metrics and degraded performance. Increase the cache size:
table.exec.rank.topn-cache-size: 200000With 200,000 cache entries:
200,000 * 50 / 100 / 100,000 = 100% hit ratioIf the number of partition keys is large, also increase the cache size and heap memory of TopN. For details, see Configure a deployment.
Include a time field in PARTITION BY
Add a time field such as day to the PARTITION BY clause. Without it, TopN results become disordered when state data expires due to TTL.
Deduplicate efficiently
Input streams often contain duplicate records. Realtime Compute for Apache Flink provides two deduplication policies:
Deduplicate Keep FirstRow: Keeps the earliest record per key.
Deduplicate Keep LastRow: Keeps the latest record per key.
Syntax
Deduplication is a special case of TopN. Use ROW_NUMBER() with an OVER clause to assign a row number per partition, then keep only rownum = 1:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
ORDER BY timeAttributeCol [ASC|DESC]) AS rownum
FROM table_name)
WHERE rownum = 1| Element | Description |
|---|---|
ROW_NUMBER() | Assigns a row number starting from 1 within each partition. |
PARTITION BY col1[, col2..] | Columns that define the deduplication key. |
ORDER BY timeAttributeCol | A time attribute column (proctime or rowtime). ASC keeps the first row (Keep FirstRow). DESC keeps the last row (Keep LastRow). |
rownum = 1 | Retains only the first row per partition. Also supports rownum <= 1. |
Time attribute behavior:
proctime(processing time): Deduplication is based on when records are processed. Results may vary between runs.rowtime(event time): Deduplication is based on when records were produced. Results are deterministic.
Keep FirstRow
Keeps the first record per deduplication key. State stores only primary key data, so state access is highly efficient.
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) AS rowNum
FROM T
)
WHERE rowNum = 1This deduplicates table T by column b, keeping the earliest record by processing time. Instead of declaring a proctime attribute, call the PROCTIME() function.
Keep LastRow
Keeps the most recent record per deduplication key. This slightly outperforms the LAST_VALUE function.
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) AS rowNum
FROM T
)
WHERE rowNum = 1This deduplicates table T by columns b and d, keeping the latest record by event time.
Use built-in functions efficiently
Prefer built-in functions over UDFs
Built-in functions are continuously optimized with better serialization, deserialization, and byte-level operations. Replace user-defined functions (UDFs) with built-in equivalents whenever possible.
Use single-character delimiters in KEYVALUE
The KEYVALUE(content, keyValueSplit, keySplit, keyName) function performs about 30% faster when keyValueSplit and keySplit are single characters such as : or ,. With single-character delimiters, the engine searches for the target key directly in binary data without parsing the entire input.
LIKE operator patterns
| Pattern | Matches | Example |
|---|---|---|
LIKE 'xxx%' | Starts with xxx | LIKE 'order%' |
LIKE '%xxx' | Ends with xxx | LIKE '%_id' |
LIKE '%xxx%' | Contains xxx | LIKE '%error%' |
LIKE 'xxx' | Exact match (equivalent to = 'xxx') | LIKE 'active' |
Escape underscores: The underscore (_) is a single-character wildcard in SQL. To match a literal underscore, use an escape character:
LIKE '%seller/_id%' ESCAPE '/'Without escaping, LIKE '%seller_id%' also matches seller#id, sellerxid, seller1id, and other unintended strings.
Avoid regular expressions
Regular expressions can be 100x slower than arithmetic operations and may enter infinite loops in edge cases, blocking deployments. Use LIKE instead when possible.
For cases that require regular expressions, see REGEXP and REGEXP_REPLACE.
SQL hints
SQL hints let you influence the optimizer's execution plan, attach metadata or statistics, and configure Dynamic Table Options on a per-table basis. For more information about SQL hints, see SQL hints.
Syntax
The syntax follows Apache Calcite SQL:
SELECT /*+ hint [, hint ] */ ...
-- Where:
-- hint: hintName(hintOption [, hintOption]*)
-- hintOption: simpleIdentifier | numericLiteral | stringLiteralJoin hints
Query hints are a type of SQL hints that modify the execution plan within the current query block. Flink query hints currently support only join hints for both dimension table joins and regular joins.