All Products
Search
Document Center

Realtime Compute for Apache Flink:Optimize Flink SQL

Last Updated:Jun 08, 2026

Improve throughput and reduce latency of Flink SQL deployments by tuning resources, enabling aggregation optimizations, and rewriting SQL patterns.

Set baseline parameters for throughput

Add the following parameters to the Other Configuration field in the Parameters section of the Configuration tab. These improve throughput and reduce hotspot issues.Configure custom deployment parameters.

execution.checkpointing.interval: 180s
table.exec.state.ttl: 129600000
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.optimizer.distinct-agg.split.enabled: true
Parameter Description
execution.checkpointing.interval Checkpoint interval. The value 180s means 180 seconds.
state.backend State backend type.
table.exec.state.ttl Time-to-live (TTL) of state data, in milliseconds.
table.exec.mini-batch.enabled Enables miniBatch aggregation.
table.exec.mini-batch.allow-latency Maximum latency before a mini-batch is triggered.
table.exec.mini-batch.size Maximum records per mini-batch. VVR optimizes this automatically — leave unconfigured in most cases.Key parameters.
table.optimizer.distinct-agg.split.enabled Enables PartialFinal optimization for COUNT DISTINCT.

Scale deployment resources

Ververica Platform (VVP) caps JobManager and TaskManager CPU at configured values. Increase them when deployments are resource-constrained.

Scale JobManager resources

For parallel deployments, increase JobManager resources in the Resources section of the Configuration tab. For example:

  • Job Manager CPU: 4

  • Job Manager Memory: 8 GiB

Scale TaskManager resources

For complex topologies, increase TaskManager resources in the Resources section of the Configuration tab. For example:

  • Task Manager CPU: 2

  • Task Manager Memory: 4 GiB

Note

Keep taskmanager.numberOfTaskSlots at its default value of 1.

Optimize group aggregation

By default, the group aggregation operator processes records one by one: read the accumulator from state, update it, write it back, then repeat. This per-record pattern increases state backend overhead — especially with RocksDB — and worsens under data hotspots.

Enable miniBatch

MiniBatch buffers incoming records and processes them together, reducing state accesses per batch. This improves throughput at the cost of slightly higher latency. The miniBatch feature triggers micro-batch processing based on event messages inserted at the source at a specified interval.

When to use: Data aggregation scenarios where ultra-low latency is not required.

How to enable: Add the following to the Other Configuration field in the Parameters section of the Configuration tab. Configure custom deployment parameters.

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
Parameter Description
table.exec.mini-batch.enabled Set to true to enable miniBatch.
table.exec.mini-batch.allow-latency Maximum latency before a mini-batch is triggered.
table.exec.mini-batch.size Maximum records per mini-batch. Leave unconfigured — VVR optimizes this automatically. Key parameters.

Enable LocalGlobal

LocalGlobal splits aggregation into two phases — local pre-aggregation at each upstream node, then global aggregation — similar to MapReduce combine and reduce. Local aggregation pre-filters hotspot data, reducing the volume reaching the global phase.

When to use: Common aggregate functions such as SUM, COUNT, MAX, MIN, and AVG with data hotspot issues.

Prerequisites:

  • MiniBatch must be enabled. LocalGlobal uses the mini-batch interval to determine how many records to accumulate before local aggregation.

  • The aggregate function must implement the merge method (AggregateFunction).

Status: Enabled by default when miniBatch is active.

Verify: Check the final topology for a GlobalGroupAggregate or LocalGroupAggregate node.

Enable PartialFinal for COUNT DISTINCT

LocalGlobal is less effective for COUNT DISTINCT because local aggregation cannot remove duplicate distinct keys. PartialFinal addresses this by splitting aggregation into two layers: the first scatters data by distinct key hash, the second performs final aggregation.

When to use: COUNT DISTINCT queries with large data volumes where aggregation performance is a bottleneck.

How to enable: Add the following to the Other Configuration field. Configure custom deployment parameters.

table.optimizer.distinct-agg.split.enabled: true

Limitations:

  • PartialFinal cannot be used in SQL that contains user-defined aggregate functions (UDAFs).

  • Enable only for large data volumes. PartialFinal introduces an extra network shuffle.

Verify: Check whether one-layer aggregation changes to two-layer aggregation in the final topology.

Use AGG WITH FILTER instead of CASE WHEN

When computing COUNT DISTINCT on the same field under different conditions, use the FILTER clause instead of CASE WHEN. The optimizer recognizes different filter arguments on the same distinct key and uses a single shared state instance, reducing state access and size. Performance tests show 2x improvement over CASE WHEN.

Before (less efficient):

COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT CASE WHEN is_wireless = 'y' THEN visitor_id ELSE NULL END) AS uv_wireless

After (2x performance):

COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT visitor_id) FILTER (WHERE is_wireless = 'y') AS uv_wireless

Optimize joins

Key-value separation for two-stream joins

In VVR 6.0.1 and later, the engine automatically infers whether to enable key-value separation for two-stream JOIN operators, improving typical join performance by more than 40%.

Configure the table.exec.join.kv-separate parameter to control this behavior:

Value Behavior
AUTO The engine enables key-value separation automatically based on join operator state. Default.
FORCE Force-enable key-value separation.
NONE Force-disable key-value separation.
Note

Key-value separation only takes effect on GeminiStateBackend.

Enable miniBatch for joins

By default, the regular join operator processes records one by one: look up counterpart state by join key, update state, then output results. This per-record pattern increases state backend overhead — especially with RocksDB — and can cause severe record amplification in cascading joins.

MiniBatch for joins addresses this with two core optimizations:

  1. Record folding: Folds records in the buffer to reduce the data volume before the join process.

  2. Output suppression: Suppresses redundant intermediate results during batch processing.

MiniBatch triggers processing when the maximum allowed latency is reached, the batch size limit is reached, or a checkpoint occurs.

VVR version: 8.0.4 or later.

How to enable: Add the following to the Other Configuration field. Configure custom deployment parameters.

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.stream.join.mini-batch-enabled: true

Best for: Cascading outer joins with message amplification. Upstream operators use message collapse or merge to suppress amplification on downstream operators. For example:

SELECT a.id AS a_id, a.a_content, B.id AS b_id, B.b_content
FROM a LEFT JOIN
  (SELECT * FROM b
      LEFT JOIN c ON b.prd_id = c.id) B
ON a.id = B.id

Optimize TopN queries

TopN algorithms

The algorithm used depends on whether the input data stream is static or dynamic:

Input type Available algorithms Notes
Static data streams (e.g., from Simple Log Service) AppendRank Only option for static streams.
Dynamic data streams (e.g., from aggregate or join) UpdateFastRank, RetractRank UpdateFastRank is optimal. RetractRank is the fallback.

The algorithm name appears in the topology node name.

Switch from RetractRank to UpdateFastRank

UpdateFastRank requires all of the following conditions:

  1. The input stream does not contain DELETE or UPDATE_BEFORE messages. Verify with EXPLAIN:

       EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set>
  2. The input stream contains primary key information (e.g., columns used in a GROUP BY clause).

  3. The ORDER BY fields are monotonically updated in the opposite order of sorting. For example, ORDER BY COUNT DESC, ORDER BY COUNT_DISTINCT DESC, or ORDER BY SUM(positive value) DESC.

Example: For ORDER BY SUM DESC, filter for positive values to guarantee monotonicity:

INSERT INTO print_test
SELECT
  cate_id,
  seller_id,
  stat_date,
  pay_ord_amt
FROM (
    SELECT
      *,
      ROW_NUMBER() OVER (
        -- The PARTITION BY columns must appear in the subquery's GROUP BY.
        -- Include a time field to prevent disorder when state expires.
        PARTITION BY cate_id, stat_date
        ORDER BY pay_ord_amt DESC
      ) AS rownum
    FROM (
        SELECT
          cate_id,
          seller_id,
          stat_date,
          -- SUM is monotonically increasing because only positive values are included.
          SUM(total_fee) FILTER (WHERE total_fee >= 0) AS pay_ord_amt
        FROM random_test
        WHERE total_fee >= 0
        GROUP BY seller_id, stat_date, cate_id
      ) a
  )
WHERE rownum <= 100;
Note

In this example, the random_test table contains static data streams. The aggregation result does not contain DELETE or UPDATE_BEFORE messages, so monotonicity is preserved.

Reduce output volume

Do not include rownum in the final SELECT output. Sort results in the frontend instead to reduce sink write volume. Top-N.

Increase the TopN cache size

TopN maintains a state cache to reduce disk reads. Calculate the cache hit ratio with this formula:

cache_hit = cache_size * parallelism / top_n / partition_key_num

Example: With Top100, a default cache size of 10,000 records, parallelism of 50, and 100,000 partition keys:

10,000 * 50 / 100 / 100,000 = 5% hit ratio

A 5% hit ratio means most reads go to disk, causing unstable state seek metrics and degraded performance. Increase the cache size:

table.exec.rank.topn-cache-size: 200000

With 200,000 cache entries:

200,000 * 50 / 100 / 100,000 = 100% hit ratio

If the partition key count is large, also increase the TopN cache size and heap memory. Configure a deployment.

Include a time field in PARTITION BY

Add a time field such as day to the PARTITION BY clause. Without it, TopN results become disordered when state data expires due to TTL.

Deduplicate efficiently

Input streams often contain duplicates. Realtime Compute for Apache Flink provides two deduplication policies:

  • Deduplicate Keep FirstRow: Keeps the earliest record per key.

  • Deduplicate Keep LastRow: Keeps the latest record per key.

Syntax

Deduplication is a special case of TopN. Use ROW_NUMBER() with an OVER clause to assign a row number per partition, then keep only rownum = 1:

SELECT *
FROM (
   SELECT *,
    ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
     ORDER BY timeAttributeCol [ASC|DESC]) AS rownum
   FROM table_name)
WHERE rownum = 1
Element Description
ROW_NUMBER() Assigns a row number starting from 1 within each partition.
PARTITION BY col1[, col2..] Columns that define the deduplication key.
ORDER BY timeAttributeCol A time attribute column (proctime or rowtime). ASC keeps the first row (Keep FirstRow). DESC keeps the last row (Keep LastRow).
rownum = 1 Retains only the first row per partition. Also supports rownum <= 1.

Time attribute behavior:

  • proctime (processing time): Deduplication is based on when records are processed. Results may vary between runs.

  • rowtime (event time): Deduplication is based on when records were produced. Results are deterministic.

Keep FirstRow

Keeps the first record per deduplication key. State stores only primary key data, so state access is highly efficient.

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) AS rowNum
  FROM T
)
WHERE rowNum = 1

Deduplicates table T by column b, keeping the earliest record by processing time. Instead of declaring a proctime attribute, call the PROCTIME() function.

Keep LastRow

Keeps the most recent record per deduplication key. This slightly outperforms the LAST_VALUE function.

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) AS rowNum
  FROM T
)
WHERE rowNum = 1

Deduplicates table T by columns b and d, keeping the latest record by event time.

Use built-in functions efficiently

Prefer built-in functions over UDFs

Built-in functions are optimized for serialization, deserialization, and byte-level operations. Replace UDFs with built-in equivalents whenever possible.

Use single-character delimiters in KEYVALUE

The KEYVALUE(content, keyValueSplit, keySplit, keyName) function runs about 30% faster when keyValueSplit and keySplit are single characters such as : or ,. With single-character delimiters, the engine searches for the target key directly in binary data without parsing the entire input.

LIKE operator patterns

Pattern Matches Example
LIKE 'xxx%' Starts with xxx LIKE 'order%'
LIKE '%xxx' Ends with xxx LIKE '%_id'
LIKE '%xxx%' Contains xxx LIKE '%error%'
LIKE 'xxx' Exact match (equivalent to = 'xxx') LIKE 'active'

Escape underscores: The underscore (_) is a single-character wildcard in SQL. To match a literal underscore, use an escape character:

LIKE '%seller/_id%' ESCAPE '/'

Without escaping, LIKE '%seller_id%' also matches seller#id, sellerxid, seller1id, and other unintended strings.

Avoid regular expressions

Regular expressions can be 100x slower than arithmetic operations and may enter infinite loops in edge cases, blocking deployments. Use LIKE instead when possible.

For cases that require regular expressions, see REGEXP and REGEXP_REPLACE.

SQL hints

SQL hints influence the optimizer's execution plan, attach metadata or statistics, and configure Dynamic Table Options on a per-table basis. SQL hints.

Syntax

The syntax follows Apache Calcite SQL:

SELECT /*+ hint [, hint ] */ ...

-- Where:
-- hint:       hintName(hintOption [, hintOption]*)
-- hintOption:  simpleIdentifier | numericLiteral | stringLiteral

Join hints

Query hints modify the execution plan within the current query block. Flink currently supports only join hints, for both dimension table joins and regular joins.