All Products
Search
Document Center

Realtime Compute for Apache Flink:Optimize Flink SQL

Last Updated:Mar 09, 2026

Improve throughput and reduce latency of your Realtime Compute for Apache Flink SQL deployments by tuning resource configurations, enabling aggregation optimizations, and rewriting SQL patterns.

Set baseline parameters for throughput

Add the following parameters to the Other Configuration field in the Parameters section of the Configuration tab. These settings improve throughput and reduce data hotspot issues. For details, see Configure custom deployment parameters.

execution.checkpointing.interval: 180s
table.exec.state.ttl: 129600000
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.optimizer.distinct-agg.split.enabled: true
ParameterDescription
execution.checkpointing.intervalCheckpoint interval. The value 180s means 180 seconds.
state.backendState backend type.
table.exec.state.ttlTime-to-live (TTL) of state data, in milliseconds.
table.exec.mini-batch.enabledWhether to enable the miniBatch feature.
table.exec.mini-batch.allow-latencyMaximum latency before a mini-batch is triggered.
table.exec.mini-batch.sizeMaximum number of records cached per mini-batch. VVR already optimizes this parameter, so leave it unconfigured in most cases. For details, see Key parameters.
table.optimizer.distinct-agg.split.enabledWhether to enable PartialFinal optimization for COUNT DISTINCT.

Scale deployment resources

Ververica Platform (VVP) caps the CPU cores available to each JobManager and TaskManager at the values you configure. Increase these values when deployments are resource-constrained.

Scale JobManager resources

If many deployments run in parallel, increase JobManager resources in the Resources section of the Configuration tab. For example:

  • Job Manager CPU: 4

  • Job Manager Memory: 8 GiB

Scale TaskManager resources

If the deployment topology is complex, increase TaskManager resources in the Resources section of the Configuration tab. For example:

  • Task Manager CPU: 2

  • Task Manager Memory: 4 GiB

Note

Keep taskmanager.numberOfTaskSlots at its default value of 1.

Optimize group aggregation

By default, the group aggregation operator processes input records one by one: read the accumulator from state, update it, write it back, then repeat for the next record. This per-record pattern increases state backend overhead -- especially with RocksDB -- and worsens under data hotspot conditions. The following optimizations address these bottlenecks.

Enable miniBatch

MiniBatch buffers incoming records and processes them together, reducing the number of state accesses per batch. This improves throughput at the cost of slightly higher latency. The miniBatch feature triggers micro-batch processing based on event messages inserted at the source at a specified interval.

When to use: Data aggregation scenarios where ultra-low latency is not required.

How to enable: Add the following to the Other Configuration field in the Parameters section of the Configuration tab. For details, see Configure custom deployment parameters.

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
ParameterDescription
table.exec.mini-batch.enabledSet to true to enable miniBatch.
table.exec.mini-batch.allow-latencyMaximum latency before a mini-batch is triggered.
table.exec.mini-batch.sizeMaximum records per mini-batch. Leave unconfigured -- VVR already optimizes this parameter. For details, see Key parameters.

Enable LocalGlobal

LocalGlobal splits the aggregation into two phases -- local aggregation at each upstream node, followed by global aggregation -- similar to the combine and reduce phases in MapReduce. Local aggregation pre-filters hotspot data, reducing the volume that reaches the global aggregation phase.

When to use: Common aggregate functions such as SUM, COUNT, MAX, MIN, and AVG with data hotspot issues.

Prerequisites:

  • MiniBatch must be enabled. LocalGlobal uses the mini-batch interval to determine how many records to accumulate before local aggregation.

  • The aggregate function must implement the merge method (AggregateFunction).

Status: Enabled by default when miniBatch is active.

Verify: Check the final topology for a GlobalGroupAggregate or LocalGroupAggregate node.

Enable PartialFinal for COUNT DISTINCT

LocalGlobal is less effective for COUNT DISTINCT because local aggregation cannot remove duplicate distinct keys -- duplicates still accumulate in the global phase. PartialFinal addresses this by automatically splitting the aggregation into two layers: the first layer scatters data by the distinct key hash, and the second layer performs the final aggregation.

When to use: COUNT DISTINCT queries with large data volumes where aggregation performance is a bottleneck.

How to enable: Add the following to the Other Configuration field. For details, see Configure custom deployment parameters.

table.optimizer.distinct-agg.split.enabled: true

Limitations:

  • PartialFinal cannot be used in SQL that contains user-defined aggregate functions (UDAFs).

  • Enable only for large data volumes. PartialFinal introduces an extra network shuffle because it splits aggregation into two phases.

Verify: Check whether one-layer aggregation changes to two-layer aggregation in the final topology.

Use AGG WITH FILTER instead of CASE WHEN

When computing COUNT DISTINCT on the same field under different conditions, use the SQL FILTER clause instead of CASE WHEN. The Flink SQL optimizer recognizes different filter arguments on the same distinct key and uses a single shared state instance instead of separate state instances for each computation. This reduces state access and state size, doubling deployment performance compared to AGG WITH CASE WHEN in performance tests.

Before (less efficient):

COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT CASE WHEN is_wireless = 'y' THEN visitor_id ELSE NULL END) AS uv_wireless

After (2x performance):

COUNT(DISTINCT visitor_id) AS uv_total,
COUNT(DISTINCT visitor_id) FILTER (WHERE is_wireless = 'y') AS uv_wireless

Optimize joins

Key-value separation for two-stream joins

In VVR 6.0.1 and later, the Realtime Compute for Apache Flink engine automatically infers whether to enable key-value separation for JOIN operators on two data streams. After key-value separation is enabled, typical join performance improves by more than 40%.

Configure the table.exec.join.kv-separate parameter to control this behavior:

ValueBehavior
AUTOThe engine enables key-value separation automatically based on the join operator state. This is the default.
FORCEForce-enable key-value separation.
NONEForce-disable key-value separation.
Note

Key-value separation only takes effect on GeminiStateBackend.

Enable miniBatch for joins

By default, the regular join operator processes records one by one: look up associated records from the counterpart state based on the join key, update state with the current record, then output join results. This per-record pattern increases state backend overhead -- especially with RocksDB -- and can lead to severe record amplification in cascading join scenarios.

MiniBatch for joins addresses this with two core optimizations:

  1. Record folding: Folds records in the buffer to reduce the data volume before the join process.

  2. Output suppression: Suppresses redundant intermediate results during batch processing.

MiniBatch triggers micro-batch processing when one of the following conditions is met: the configured maximum allowed latency is reached, the cached batch size is reached, or another event such as a checkpoint occurs.

VVR version: 8.0.4 or later.

How to enable: Add the following to the Other Configuration field. For details, see Configure custom deployment parameters.

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.stream.join.mini-batch-enabled: true

Best for: Cascading outer joins where message amplification is a concern. Upstream operators use a message collapse or merge mechanism to suppress the message amplification effect on downstream operators. For example:

SELECT a.id AS a_id, a.a_content, B.id AS b_id, B.b_content
FROM a LEFT JOIN
  (SELECT * FROM b
      LEFT JOIN c ON b.prd_id = c.id) B
ON a.id = B.id

Optimize TopN queries

TopN algorithms

The algorithm used depends on whether the input data stream is static or dynamic:

Input typeAvailable algorithmsNotes
Static data streams (e.g., from Simple Log Service)AppendRankOnly option for static streams.
Dynamic data streams (e.g., from aggregate or join)UpdateFastRank, RetractRankUpdateFastRank is optimal. RetractRank is the fallback.

The algorithm name appears in the topology node name.

Switch from RetractRank to UpdateFastRank

UpdateFastRank requires all of the following conditions:

  1. The input stream does not contain DELETE or UPDATE_BEFORE messages. Verify with: For details on the EXPLAIN syntax, see EXPLAIN.

       EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set>
  2. The input stream contains primary key information (e.g., columns used in a GROUP BY clause).

  3. The ORDER BY fields are monotonically updated in the opposite order of sorting. For example, ORDER BY COUNT DESC, ORDER BY COUNT_DISTINCT DESC, or ORDER BY SUM(positive value) DESC.

Example: For ORDER BY SUM DESC, filter for positive values to guarantee monotonicity:

INSERT INTO print_test
SELECT
  cate_id,
  seller_id,
  stat_date,
  pay_ord_amt
FROM (
    SELECT
      *,
      ROW_NUMBER() OVER (
        -- The PARTITION BY columns must appear in the subquery's GROUP BY.
        -- Include a time field to prevent disorder when state expires.
        PARTITION BY cate_id, stat_date
        ORDER BY pay_ord_amt DESC
      ) AS rownum
    FROM (
        SELECT
          cate_id,
          seller_id,
          stat_date,
          -- SUM is monotonically increasing because only positive values are included.
          SUM(total_fee) FILTER (WHERE total_fee >= 0) AS pay_ord_amt
        FROM random_test
        WHERE total_fee >= 0
        GROUP BY seller_id, stat_date, cate_id
      ) a
  )
WHERE rownum <= 100;
Note

In this example, the random_test table contains static data streams. The aggregation result does not contain DELETE or UPDATE_BEFORE messages, so monotonicity is preserved.

Reduce output volume

Do not include rownum in the final SELECT output. Sort results in the frontend instead. This significantly reduces the volume of data written to the sink. For details, see Top-N.

Increase the TopN cache size

TopN maintains a state cache to reduce disk reads. Calculate the cache hit ratio with this formula:

cache_hit = cache_size * parallelism / top_n / partition_key_num

Example: With Top100, a default cache size of 10,000 records, parallelism of 50, and 100,000 partition keys:

10,000 * 50 / 100 / 100,000 = 5% hit ratio

A 5% hit ratio means most reads go to disk, causing unstable state seek metrics and degraded performance. Increase the cache size:

table.exec.rank.topn-cache-size: 200000

With 200,000 cache entries:

200,000 * 50 / 100 / 100,000 = 100% hit ratio

If the number of partition keys is large, also increase the cache size and heap memory of TopN. For details, see Configure a deployment.

Include a time field in PARTITION BY

Add a time field such as day to the PARTITION BY clause. Without it, TopN results become disordered when state data expires due to TTL.

Deduplicate efficiently

Input streams often contain duplicate records. Realtime Compute for Apache Flink provides two deduplication policies:

  • Deduplicate Keep FirstRow: Keeps the earliest record per key.

  • Deduplicate Keep LastRow: Keeps the latest record per key.

Syntax

Deduplication is a special case of TopN. Use ROW_NUMBER() with an OVER clause to assign a row number per partition, then keep only rownum = 1:

SELECT *
FROM (
   SELECT *,
    ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
     ORDER BY timeAttributeCol [ASC|DESC]) AS rownum
   FROM table_name)
WHERE rownum = 1
ElementDescription
ROW_NUMBER()Assigns a row number starting from 1 within each partition.
PARTITION BY col1[, col2..]Columns that define the deduplication key.
ORDER BY timeAttributeColA time attribute column (proctime or rowtime). ASC keeps the first row (Keep FirstRow). DESC keeps the last row (Keep LastRow).
rownum = 1Retains only the first row per partition. Also supports rownum <= 1.

Time attribute behavior:

  • proctime (processing time): Deduplication is based on when records are processed. Results may vary between runs.

  • rowtime (event time): Deduplication is based on when records were produced. Results are deterministic.

Keep FirstRow

Keeps the first record per deduplication key. State stores only primary key data, so state access is highly efficient.

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) AS rowNum
  FROM T
)
WHERE rowNum = 1

This deduplicates table T by column b, keeping the earliest record by processing time. Instead of declaring a proctime attribute, call the PROCTIME() function.

Keep LastRow

Keeps the most recent record per deduplication key. This slightly outperforms the LAST_VALUE function.

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) AS rowNum
  FROM T
)
WHERE rowNum = 1

This deduplicates table T by columns b and d, keeping the latest record by event time.

Use built-in functions efficiently

Prefer built-in functions over UDFs

Built-in functions are continuously optimized with better serialization, deserialization, and byte-level operations. Replace user-defined functions (UDFs) with built-in equivalents whenever possible.

Use single-character delimiters in KEYVALUE

The KEYVALUE(content, keyValueSplit, keySplit, keyName) function performs about 30% faster when keyValueSplit and keySplit are single characters such as : or ,. With single-character delimiters, the engine searches for the target key directly in binary data without parsing the entire input.

LIKE operator patterns

PatternMatchesExample
LIKE 'xxx%'Starts with xxxLIKE 'order%'
LIKE '%xxx'Ends with xxxLIKE '%_id'
LIKE '%xxx%'Contains xxxLIKE '%error%'
LIKE 'xxx'Exact match (equivalent to = 'xxx')LIKE 'active'

Escape underscores: The underscore (_) is a single-character wildcard in SQL. To match a literal underscore, use an escape character:

LIKE '%seller/_id%' ESCAPE '/'

Without escaping, LIKE '%seller_id%' also matches seller#id, sellerxid, seller1id, and other unintended strings.

Avoid regular expressions

Regular expressions can be 100x slower than arithmetic operations and may enter infinite loops in edge cases, blocking deployments. Use LIKE instead when possible.

For cases that require regular expressions, see REGEXP and REGEXP_REPLACE.

SQL hints

SQL hints let you influence the optimizer's execution plan, attach metadata or statistics, and configure Dynamic Table Options on a per-table basis. For more information about SQL hints, see SQL hints.

Syntax

The syntax follows Apache Calcite SQL:

SELECT /*+ hint [, hint ] */ ...

-- Where:
-- hint:       hintName(hintOption [, hintOption]*)
-- hintOption:  simpleIdentifier | numericLiteral | stringLiteral

Join hints

Query hints are a type of SQL hints that modify the execution plan within the current query block. Flink query hints currently support only join hints for both dimension table joins and regular joins.