Improve throughput and reduce latency of Flink SQL deployments by tuning resources, enabling aggregation optimizations, and rewriting SQL patterns.
Set baseline parameters for throughput
Add the following parameters to the Other Configuration field in the Parameters section of the Configuration tab. These improve throughput and reduce hotspot issues.Configure custom deployment parameters.
execution.checkpointing.interval: 180s
table.exec.state.ttl: 129600000
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.optimizer.distinct-agg.split.enabled: true
| Parameter | Description |
|---|---|
execution.checkpointing.interval |
Checkpoint interval. The value 180s means 180 seconds. |
state.backend |
State backend type. |
table.exec.state.ttl |
Time-to-live (TTL) of state data, in milliseconds. |
table.exec.mini-batch.enabled |
Enables miniBatch aggregation. |
table.exec.mini-batch.allow-latency |
Maximum latency before a mini-batch is triggered. |
table.exec.mini-batch.size |
Maximum records per mini-batch. VVR optimizes this automatically — leave unconfigured in most cases.Key parameters. |
table.optimizer.distinct-agg.split.enabled |
Enables PartialFinal optimization for COUNT DISTINCT. |
Scale deployment resources
Ververica Platform (VVP) caps JobManager and TaskManager CPU at configured values. Increase them when deployments are resource-constrained.
Scale JobManager resources
For parallel deployments, increase JobManager resources in the Resources section of the Configuration tab. For example:
-
Job Manager CPU: 4
-
Job Manager Memory: 8 GiB
Scale TaskManager resources
For complex topologies, increase TaskManager resources in the Resources section of the Configuration tab. For example:
-
Task Manager CPU: 2
-
Task Manager Memory: 4 GiB
Keep taskmanager.numberOfTaskSlots at its default value of 1.
Optimize group aggregation
By default, the group aggregation operator processes records one by one: read the accumulator from state, update it, write it back, then repeat. This per-record pattern increases state backend overhead — especially with RocksDB — and worsens under data hotspots.
Enable miniBatch
MiniBatch buffers incoming records and processes them together, reducing state accesses per batch. This improves throughput at the cost of slightly higher latency. The miniBatch feature triggers micro-batch processing based on event messages inserted at the source at a specified interval.
When to use: Data aggregation scenarios where ultra-low latency is not required.
How to enable: Add the following to the Other Configuration field in the Parameters section of the Configuration tab. Configure custom deployment parameters.
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
| Parameter | Description |
|---|---|
table.exec.mini-batch.enabled |
Set to true to enable miniBatch. |
table.exec.mini-batch.allow-latency |
Maximum latency before a mini-batch is triggered. |
table.exec.mini-batch.size |
Maximum records per mini-batch. Leave unconfigured — VVR optimizes this automatically. Key parameters. |
Enable LocalGlobal
LocalGlobal splits aggregation into two phases — local pre-aggregation at each upstream node, then global aggregation — similar to MapReduce combine and reduce. Local aggregation pre-filters hotspot data, reducing the volume reaching the global phase.
When to use: Common aggregate functions such as SUM, COUNT, MAX, MIN, and AVG with data hotspot issues.
Prerequisites:
-
MiniBatch must be enabled. LocalGlobal uses the mini-batch interval to determine how many records to accumulate before local aggregation.
-
The aggregate function must implement the
mergemethod (AggregateFunction).
Status: Enabled by default when miniBatch is active.
Verify: Check the final topology for a GlobalGroupAggregate or LocalGroupAggregate node.
Enable PartialFinal for COUNT DISTINCT
LocalGlobal is less effective for COUNT DISTINCT because local aggregation cannot remove duplicate distinct keys. PartialFinal addresses this by splitting aggregation into two layers: the first scatters data by distinct key hash, the second performs final aggregation.
When to use: COUNT DISTINCT queries with large data volumes where aggregation performance is a bottleneck.
How to enable: Add the following to the Other Configuration field. Configure custom deployment parameters.
table.optimizer.distinct-agg.split.enabled: true
Limitations:
-
PartialFinal cannot be used in SQL that contains user-defined aggregate functions (UDAFs).
-
Enable only for large data volumes. PartialFinal introduces an extra network shuffle.
Verify: Check whether one-layer aggregation changes to two-layer aggregation in the final topology.
Use AGG WITH FILTER instead of CASE WHEN
When computing COUNT DISTINCT on the same field under different conditions, use the FILTER clause instead of CASE WHEN. The optimizer recognizes different filter arguments on the same distinct key and uses a single shared state instance, reducing state access and size. Performance tests show 2x improvement over CASE WHEN.
Before (less efficient):
COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT CASE WHEN is_wireless = 'y' THEN visitor_id ELSE NULL END) AS uv_wireless
After (2x performance):
COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT visitor_id) FILTER (WHERE is_wireless = 'y') AS uv_wireless
Optimize joins
Key-value separation for two-stream joins
In VVR 6.0.1 and later, the engine automatically infers whether to enable key-value separation for two-stream JOIN operators, improving typical join performance by more than 40%.
Configure the table.exec.join.kv-separate parameter to control this behavior:
| Value | Behavior |
|---|---|
AUTO |
The engine enables key-value separation automatically based on join operator state. Default. |
FORCE |
Force-enable key-value separation. |
NONE |
Force-disable key-value separation. |
Key-value separation only takes effect on GeminiStateBackend.
Enable miniBatch for joins
By default, the regular join operator processes records one by one: look up counterpart state by join key, update state, then output results. This per-record pattern increases state backend overhead — especially with RocksDB — and can cause severe record amplification in cascading joins.
MiniBatch for joins addresses this with two core optimizations:
-
Record folding: Folds records in the buffer to reduce the data volume before the join process.
-
Output suppression: Suppresses redundant intermediate results during batch processing.
MiniBatch triggers processing when the maximum allowed latency is reached, the batch size limit is reached, or a checkpoint occurs.
VVR version: 8.0.4 or later.
How to enable: Add the following to the Other Configuration field. Configure custom deployment parameters.
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.stream.join.mini-batch-enabled: true
Best for: Cascading outer joins with message amplification. Upstream operators use message collapse or merge to suppress amplification on downstream operators. For example:
SELECT a.id AS a_id, a.a_content, B.id AS b_id, B.b_content
FROM a LEFT JOIN
(SELECT * FROM b
LEFT JOIN c ON b.prd_id = c.id) B
ON a.id = B.id
Optimize TopN queries
TopN algorithms
The algorithm used depends on whether the input data stream is static or dynamic:
| Input type | Available algorithms | Notes |
|---|---|---|
| Static data streams (e.g., from Simple Log Service) | AppendRank | Only option for static streams. |
| Dynamic data streams (e.g., from aggregate or join) | UpdateFastRank, RetractRank | UpdateFastRank is optimal. RetractRank is the fallback. |
The algorithm name appears in the topology node name.
Switch from RetractRank to UpdateFastRank
UpdateFastRank requires all of the following conditions:
-
The input stream does not contain DELETE or UPDATE_BEFORE messages. Verify with EXPLAIN:
EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set> -
The input stream contains primary key information (e.g., columns used in a GROUP BY clause).
-
The ORDER BY fields are monotonically updated in the opposite order of sorting. For example, ORDER BY COUNT DESC, ORDER BY COUNT_DISTINCT DESC, or ORDER BY SUM(positive value) DESC.
Example: For ORDER BY SUM DESC, filter for positive values to guarantee monotonicity:
INSERT INTO print_test
SELECT
cate_id,
seller_id,
stat_date,
pay_ord_amt
FROM (
SELECT
*,
ROW_NUMBER() OVER (
-- The PARTITION BY columns must appear in the subquery's GROUP BY.
-- Include a time field to prevent disorder when state expires.
PARTITION BY cate_id, stat_date
ORDER BY pay_ord_amt DESC
) AS rownum
FROM (
SELECT
cate_id,
seller_id,
stat_date,
-- SUM is monotonically increasing because only positive values are included.
SUM(total_fee) FILTER (WHERE total_fee >= 0) AS pay_ord_amt
FROM random_test
WHERE total_fee >= 0
GROUP BY seller_id, stat_date, cate_id
) a
)
WHERE rownum <= 100;
In this example, the random_test table contains static data streams. The aggregation result does not contain DELETE or UPDATE_BEFORE messages, so monotonicity is preserved.
Reduce output volume
Do not include rownum in the final SELECT output. Sort results in the frontend instead to reduce sink write volume. Top-N.
Increase the TopN cache size
TopN maintains a state cache to reduce disk reads. Calculate the cache hit ratio with this formula:
cache_hit = cache_size * parallelism / top_n / partition_key_num
Example: With Top100, a default cache size of 10,000 records, parallelism of 50, and 100,000 partition keys:
10,000 * 50 / 100 / 100,000 = 5% hit ratio
A 5% hit ratio means most reads go to disk, causing unstable state seek metrics and degraded performance. Increase the cache size:
table.exec.rank.topn-cache-size: 200000
With 200,000 cache entries:
200,000 * 50 / 100 / 100,000 = 100% hit ratio
If the partition key count is large, also increase the TopN cache size and heap memory. Configure a deployment.
Include a time field in PARTITION BY
Add a time field such as day to the PARTITION BY clause. Without it, TopN results become disordered when state data expires due to TTL.
Deduplicate efficiently
Input streams often contain duplicates. Realtime Compute for Apache Flink provides two deduplication policies:
-
Deduplicate Keep FirstRow: Keeps the earliest record per key.
-
Deduplicate Keep LastRow: Keeps the latest record per key.
Syntax
Deduplication is a special case of TopN. Use ROW_NUMBER() with an OVER clause to assign a row number per partition, then keep only rownum = 1:
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
ORDER BY timeAttributeCol [ASC|DESC]) AS rownum
FROM table_name)
WHERE rownum = 1
| Element | Description |
|---|---|
ROW_NUMBER() |
Assigns a row number starting from 1 within each partition. |
PARTITION BY col1[, col2..] |
Columns that define the deduplication key. |
ORDER BY timeAttributeCol |
A time attribute column (proctime or rowtime). ASC keeps the first row (Keep FirstRow). DESC keeps the last row (Keep LastRow). |
rownum = 1 |
Retains only the first row per partition. Also supports rownum <= 1. |
Time attribute behavior:
-
proctime(processing time): Deduplication is based on when records are processed. Results may vary between runs. -
rowtime(event time): Deduplication is based on when records were produced. Results are deterministic.
Keep FirstRow
Keeps the first record per deduplication key. State stores only primary key data, so state access is highly efficient.
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) AS rowNum
FROM T
)
WHERE rowNum = 1
Deduplicates table T by column b, keeping the earliest record by processing time. Instead of declaring a proctime attribute, call the PROCTIME() function.
Keep LastRow
Keeps the most recent record per deduplication key. This slightly outperforms the LAST_VALUE function.
SELECT *
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) AS rowNum
FROM T
)
WHERE rowNum = 1
Deduplicates table T by columns b and d, keeping the latest record by event time.
Use built-in functions efficiently
Prefer built-in functions over UDFs
Built-in functions are optimized for serialization, deserialization, and byte-level operations. Replace UDFs with built-in equivalents whenever possible.
Use single-character delimiters in KEYVALUE
The KEYVALUE(content, keyValueSplit, keySplit, keyName) function runs about 30% faster when keyValueSplit and keySplit are single characters such as : or ,. With single-character delimiters, the engine searches for the target key directly in binary data without parsing the entire input.
LIKE operator patterns
| Pattern | Matches | Example |
|---|---|---|
LIKE 'xxx%' |
Starts with xxx |
LIKE 'order%' |
LIKE '%xxx' |
Ends with xxx |
LIKE '%_id' |
LIKE '%xxx%' |
Contains xxx |
LIKE '%error%' |
LIKE 'xxx' |
Exact match (equivalent to = 'xxx') |
LIKE 'active' |
Escape underscores: The underscore (_) is a single-character wildcard in SQL. To match a literal underscore, use an escape character:
LIKE '%seller/_id%' ESCAPE '/'
Without escaping, LIKE '%seller_id%' also matches seller#id, sellerxid, seller1id, and other unintended strings.
Avoid regular expressions
Regular expressions can be 100x slower than arithmetic operations and may enter infinite loops in edge cases, blocking deployments. Use LIKE instead when possible.
For cases that require regular expressions, see REGEXP and REGEXP_REPLACE.
SQL hints
SQL hints influence the optimizer's execution plan, attach metadata or statistics, and configure Dynamic Table Options on a per-table basis. SQL hints.
Syntax
The syntax follows Apache Calcite SQL:
SELECT /*+ hint [, hint ] */ ...
-- Where:
-- hint: hintName(hintOption [, hintOption]*)
-- hintOption: simpleIdentifier | numericLiteral | stringLiteral
Join hints
Query hints modify the execution plan within the current query block. Flink currently supports only join hints, for both dimension table joins and regular joins.