All Products
Search
Document Center

Realtime Compute for Apache Flink:Job performance FAQ

Last Updated:Mar 26, 2026

This topic answers common questions about Flink SQL job performance, including optimization strategies, backpressure troubleshooting, and deduplication.

How do I split operators of a deployment?

On the O&M > Deployments page, click the deployment name. On the Configuration tab of the deployment details page, click Edit in the upper-right corner of the Parameters section, add the following configuration to the Other Configuration field, and then click Save.

pipeline.operator-chaining: 'false'

How do I optimize a deployment that uses an aggregate function with GROUP BY?

Enable miniBatch to improve throughput

miniBatch reduces how often Flink reads state data by buffering records and processing them in micro-batches. This improves throughput and reduces data output, at the cost of slightly higher latency. Micro-batch processing does not apply to scenarios that require extremely low latency, but in data aggregation scenarios it is strongly recommended to improve system performance.

miniBatch triggers micro-batch processing based on event messages inserted at the source at a specified interval.

To enable miniBatch, add the following configuration to the Other Configuration field in the Parameters section on the Configuration tab of the deployment details page:

table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
ParameterDescription
table.exec.mini-batch.enabledEnables miniBatch.
table.exec.mini-batch.allow-latencyThe interval at which records are flushed in batches.

Enable LocalGlobal to resolve common data hotspot issues

LocalGlobal splits aggregation into two phases — local and global — similar to the combine and reduce phases in MapReduce. In the local phase, Flink aggregates a micro-batch of data cached at each upstream node and produces an accumulator value. In the global phase, the accumulators are merged into the final result. By filtering skewed data early, LocalGlobal reduces data hotspot issues in the global aggregation phase.

LocalGlobal is suitable for deployments that use common aggregate functions such as SUM, COUNT, MAX, MIN, and AVG.

LocalGlobal is enabled by default, but it requires miniBatch to be enabled first. In addition, AggregateFunction must be used to merge data.

To verify that LocalGlobal is active, check whether a GlobalGroupAggregate or LocalGroupAggregate node appears in the final topology.

Enable PartialFinal to resolve data hotspot issues with COUNT DISTINCT

LocalGlobal is less effective for COUNT DISTINCT because local aggregation cannot remove duplicate distinct keys, which causes large amounts of data to accumulate in the global aggregation phase. PartialFinal addresses this by automatically scattering data and dividing the aggregation into two phases.

PartialFinal is suitable when COUNT DISTINCT aggregation cannot meet your performance requirements.

Important

PartialFinal does not work in Flink SQL code that contains user-defined aggregate functions (UDAFs). Enable PartialFinal only for large datasets, as the automatic data scattering introduces additional network shuffling.

PartialFinal is disabled by default. To enable it, add the following configuration to the Other Configuration field:

table.optimizer.distinct-agg.split.enabled: true

To verify that PartialFinal is active, check whether one-layer aggregation changes to two-layer aggregation in the final topology.

Use AGG WITH FILTER instead of AGG WITH CASE WHEN for COUNT DISTINCT

When computing COUNT DISTINCT across multiple conditions on the same field, use AGG WITH FILTER instead of AGG WITH CASE WHEN. The SQL optimizer recognizes that all COUNT DISTINCT expressions operate on the same field and uses a single shared state instance instead of one per expression — reducing both state read/write operations and overall resource usage. In performance tests, the AGG WITH FILTER syntax doubles the deployment performance compared to AGG WITH CASE WHEN.

Original statement:

COUNT(distinct visitor_id) as UV1, COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2

Optimized statement:

COUNT(distinct visitor_id) as UV1, COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2

How do I optimize a deployment by using TopN practices?

TopN algorithms

Flink selects the TopN algorithm based on the input stream type:

  • AppendRank: Used for static input streams, such as Log Service data sources.

  • UpdateFastRank: The optimal algorithm for dynamic input streams (aggregated or joined data). Use this algorithm when possible.

  • RetractRank: The fallback algorithm for dynamic input streams. If it doesn't meet your performance requirements, switch to UpdateFastRank.

The algorithm name appears in the corresponding node name in the job topology.

Switching from RetractRank to UpdateFastRank

To use UpdateFastRank, the following conditions must be met:

  • The input stream is a dynamic stream.

  • The input stream contains primary key information — for example, the GROUP BY clause aggregates on the primary key.

  • The ORDER BY field or function (such as COUNT, COUNT DISTINCT, or SUM with positive values) updates monotonically in the opposite direction of sorting.

For ORDER BY SUM DESC, add a filter to guarantee positive SUM values:

insert into print_test
SELECT
  cate_id,
  seller_id,
  stat_date,
  pay_ord_amt -- Exclude rownum from output to reduce the volume of data written to the result table.
FROM (
    SELECT
      *,
      ROW_NUMBER() OVER (
        PARTITION BY cate_id,
        stat_date -- Include stat_date to prevent out-of-order data when state TTL expires.
        ORDER BY pay_ord_amt DESC
      ) as rownum -- Sort by the aggregated SUM value.
    FROM (
        SELECT
          cate_id,
          seller_id,
          stat_date,
          -- SUM returns positive values, so the result is monotonically increasing.
          -- This allows TopN to use the optimized algorithm for top 100 records.
          sum(total_fee) filter (where total_fee >= 0) as pay_ord_amt
        FROM random_test
        WHERE total_fee >= 0
        GROUP BY cate_name, seller_id, stat_date, cate_id
      ) a
    ) WHERE rownum <= 100;

TopN optimization methods

Omit rownum from TopN output

Don't include rownum in the final SELECT of a TopN query. We recommend that you sort the results when they are finally displayed in the frontend. This significantly reduces the amount of data that is to be written to the result table.

For more details, see Top-N.

Increase the TopN cache size

TopN uses an in-state cache to reduce disk access. The cache hit ratio is calculated as:

cache_hit = cache_size * parallelism / top_n / partition_key_num

For example, with Top100, a cache of 10,000 records, parallelism of 50, and 100,000 PARTITION BY keys: 10,000 × 50 / 100 / 100,000 = 5%. A low cache hit ratio means frequent disk state access, which significantly degrades performance.

If the number of PARTITION BY keys is large, increase the cache size and heap memory of TopN:

table.exec.rank.topn-cache-size: 200000

With a cache size of 200,000, the hit ratio for the example above becomes 200,000 × 50 / 100 / 100,000 = 100%.

For instructions on adjusting heap memory, see Configure job deployments.

Include a time field in PARTITION BY

Add a time field such as Day to the PARTITION BY clause for daily rankings. Without it, state data expiry (time-to-live, TTL) can cause TopN results to go out of order.

How do I perform deduplication in an efficient manner?

Realtime Compute for Apache Flink provides two deduplication policies: Deduplicate Keep FirstRow and Deduplicate Keep LastRow.

Syntax

Flink SQL does not have native deduplication syntax. Use ROW_NUMBER() with an OVER clause to keep the first or last record per key. Deduplication is implemented as a special case of TopN.

SELECT *
FROM (
   SELECT *,
    ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
     ORDER BY timeAttributeCol [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1
ParameterDescription
ROW_NUMBER()Assigns a sequential row number starting from 1, used with an OVER clause.
PARTITION BY col1[, col2..]Defines the deduplication key (primary key columns).
ORDER BY timeAttributeCol [asc|desc]Sorts by a time attribute — either proctime (processing time) or rowtime (event time). Ascending order keeps the first record; descending order keeps the last.
rownumFilter condition: rownum = 1 or rownum <= 1.

The query has two levels:

  1. Use ROW_NUMBER() to rank records by the time attribute:

    • proctime: Flink deduplicates based on processing time. Results may vary across reruns.

    • rowtime: Flink deduplicates based on event time. Results are deterministic.

  2. Keep only the record where rownum = 1 for each key.

Deduplicate Keep FirstRow

Keep FirstRow retains the first record per key and discards subsequent duplicates. State data stores only the primary key, making state access very efficient.

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
  FROM T
)
WHERE rowNum = 1

This example deduplicates table T on column b, keeping the first record based on processing time. Call PROCTIME() directly to use processing time without declaring a proctime attribute.

Deduplicate Keep LastRow

Keep LastRow retains the most recent record per key. It slightly outperforms the LAST_VALUE function.

SELECT *
FROM (
  SELECT *,
    ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
  FROM T
)
WHERE rowNum = 1

This example deduplicates table T on columns b and d, keeping the latest record based on event time.

What do I need to focus on when I use built-in functions?

Use built-in functions instead of user-defined functions (UDFs)

Built-in functions in Realtime Compute for Apache Flink are continuously optimized for serialization/deserialization efficiency and byte-level operations. Replace UDFs with built-in functions where possible.

Use single-character delimiters in the KEYVALUE function

The signature is KEYVALUE(content, keyValueSplit, keySplit, keyName). When keyValueSplit and keySplit are single characters (such as : or ,), Flink uses an optimized algorithm that searches binary data directly for the target key without splitting the entire content string. This can improve deployment performance by about 30%.

Use the LIKE operator correctly

PatternBehavior
LIKE 'xxx%'Matches values that start with xxx
LIKE '%xxx'Matches values that end with xxx
LIKE '%xxx%'Matches values that contain xxx
LIKE 'xxx'Exact match — equivalent to str = 'xxx'
LIKE '%seller/_id%' ESCAPE '/'Matches literal underscore _ (must escape, as _ is a single-character wildcard)

If you use LIKE '%seller_id%' without escaping, it matches seller_id, seller#id, sellerxid, seller1id, and other unintended patterns.

Avoid regular expressions

Data processing by using regular expressions is time-consuming, which may cause performance overhead a hundred times higher than the addition, subtraction, multiplication, or division operation. In addition, regular expressions may enter an infinite loop in some extreme cases, causing jobs to block. Use the LIKE operator instead of regex wherever possible.

If you must use regex, refer to:

What do I do if the data reading efficiency is low and backpressure exists when full data is read from tables?

Low downstream processing speed causes backpressure. Check whether the sink has backpressure first, and then use one of the following methods:

  • Increase the degree of parallelism.

  • Enable aggregation optimizations such as miniBatch.

What are the meanings of different colors for the legends in the Status Durations column on the Subtask Metrics tab?

image.png

The Status Durations column shows how long each vertex subtask spent in each phase. Each color represents a subtask state:

  • image.png: CREATED

  • image.png: SCHEDULED

  • image.png: DEPLOYING

  • image.png: INITIALIZING

  • image.png: RUNNING

What is the RMI TCP Connection thread? Why does it consume significantly more CPU than other threads?

image

The RMI TCP Connection thread is a Java Remote Method Invocation (RMI) framework thread used to call remote methods. CPU usage for any thread fluctuates in real time, so a short spike does not indicate sustained high load.

To evaluate actual CPU impact, observe the thread over a period of time and analyze its flame graph. The following figure shows a typical case where the RMI TCP Connection thread consumes almost no CPU.

image

Why does a time difference exist between the current time and the watermark values on the Watermarks tab and the Task InputWatermark metric?

There are two possible causes.

Cause 1: The watermark is declared using a TIMESTAMP_LTZ field

When the source table declares the watermark using a field of the TIMESTAMP_LTZ (TIMESTAMP(p) WITH LOCAL TIME ZONE) type, a time difference exists between the current time and the values of the watermark-related parameters.

The following example uses CURRENT_TIMESTAMP, which generates TIMESTAMP_LTZ data:

CREATE TEMPORARY TABLE s1 (
  a INT,
  b INT,
  ts as CURRENT_TIMESTAMP,-- Generates TIMESTAMP_LTZ data using the built-in CURRENT_TIMESTAMP function.
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector'='datagen',
  'rows-per-second'='1',
  'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
);

CREATE TEMPORARY TABLE t1 (
  k INT,
  ts_ltz timestamp_ltz(3),
  cnt BIGINT
) WITH ('connector' = 'print');

-- Get the calculation result.
INSERT INTO t1
SELECT b, window_start, COUNT(*) FROM
TABLE(
    TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND))
GROUP BY b, window_start, window_end;
Note

The table-valued function (TVF) window syntax produces the same result as the legacy window syntax. The equivalent legacy syntax is:

SELECT b, TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) FROM s1 GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), b;

When deployed with UTC+8 data, an 8-hour gap appears between the current time and the watermark values on the Watermarks tab and the Task InputWatermark metric:

  • Watermark & Low Watermark

    image

  • Task InputWatermark

    image

The following example uses TIMESTAMP (TIMESTAMP(p) WITHOUT TIME ZONE) instead, which avoids the timezone offset:

CREATE TEMPORARY TABLE s1 (
  a INT,
  b INT,
  -- No timezone information. Timestamps increment by one second from 2024-01-31 01:00:00.
  ts as TIMESTAMPADD(SECOND, a, TIMESTAMP '2024-01-31 01:00:00'),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector'='datagen',
  'rows-per-second'='1',
  'fields.a.kind'='sequence','fields.a.start'='0','fields.a.end'='100000',
  'fields.b.kind'='random','fields.b.min'='0','fields.b.max'='10'
);

CREATE TEMPORARY TABLE t1 (
  k INT,
  ts_ltz timestamp_ltz(3),
  cnt BIGINT
) WITH ('connector' = 'print');

-- Get the calculation result.
INSERT INTO t1
SELECT b, window_start, COUNT(*) FROM
TABLE(
    TUMBLE(TABLE s1, DESCRIPTOR(ts), INTERVAL '5' SECOND))
GROUP BY b, window_start, window_end;

After deployment, the watermark values align with the current time — no time difference:

  • Watermark & Low Watermark

    image

  • Task InputWatermark

    image

Cause 2: Timezone difference between the Realtime Compute console and the Apache Flink UI

The Realtime Compute for Apache Flink development console displays time in UTC+0. The Apache Flink UI displays local time based on the browser's timezone. If your browser is set to UTC+8, the Apache Flink UI shows times 8 hours ahead of the console.

  • Development console (UTC+0):

    image

  • Apache Flink UI (local timezone from browser):

    image

How do I troubleshoot backpressure issues?

  1. On the Deployments page, click the deployment name. On the deployment details page, click the Status tab.

  2. Check the Busy and Backpressured indicators to locate the operator with backpressure. A redder Busy indicator light indicates a heavier load. A darker Backpressured indicator indicates a more severe backpressure issue.

    image.png

  3. Click the operator with backpressure.

  4. On the BackPressure tab, review the backpressure status of individual subtasks.

    image.png

How do I troubleshoot high latency issues?

On the Alarm or Metrics tab of the Deployments page, check the currentEmitEventTimeLag and currentFetchEventTimeLag metrics:

  • `currentEmitEventTimeLag` is high: Latency exists in data processing within the deployment. Check operator performance.

  • `currentFetchEventTimeLag` is high: Latency exists in the upstream system. Investigate network I/O issues and the upstream system.

Note

If the issue originates in the upstream system, both metrics increase simultaneously.

image.png

What do I do if backpressure occurs due to data hotspot issues of Flink SQL deployments?

Check the subtask status to confirm the issue is caused by data hotspots, then apply one of the following optimizations.

Enable LocalGlobal to resolve common data hotspot issues

LocalGlobal splits aggregation into two phases — local and global — similar to the combine and reduce phases in MapReduce. In the local phase, Flink aggregates a micro-batch of data cached at each upstream node and produces an accumulator value. In the global phase, the accumulators are merged into the final result. By filtering skewed data early, LocalGlobal reduces data hotspot pressure in the global aggregation phase.

LocalGlobal is suitable for deployments using common aggregate functions such as SUM, COUNT, MAX, MIN, and AVG.

LocalGlobal is enabled by default, but requires miniBatch to be enabled first. AggregateFunction must also be used to merge data.

To verify that LocalGlobal is active, check whether a GlobalGroupAggregate or LocalGroupAggregate node appears in the final topology.

Enable PartialFinal to resolve data hotspot issues with COUNT DISTINCT

LocalGlobal is less effective for COUNT DISTINCT because local aggregation cannot eliminate duplicate distinct keys. PartialFinal addresses this by scattering data automatically and splitting aggregation into two phases.

PartialFinal is suitable when COUNT DISTINCT aggregation performance doesn't meet requirements.

Important

PartialFinal does not work in Flink SQL code containing UDAFs. Enable it only for large datasets, as it introduces additional network shuffling.

PartialFinal is disabled by default. Enable it with:

table.optimizer.distinct-agg.split.enabled: true

To verify, check whether one-layer aggregation changes to two-layer aggregation in the final topology.

What do I do if the consumption of upstream data is unstable?

Check each of the following causes:

  • Mismatch between data generation speed and processing speed: Analyze upstream data generation patterns and align them with the processing capacity.

  • Backpressure in the deployment: Check for backpressure on job vertices. If the deployment has only one node, add pipeline.operator-chaining: 'false' and restart to break down the operator chain — this helps identify which node is causing the backpressure.

  • Abnormal I/O rate: Check data input metrics and consumption rate at the time the issue occurs to determine whether an abnormal I/O rate is the cause.

  • Abnormal data consumption rate: Check whether consumption rate fluctuations coincide with garbage collection (GC) events. If they do, check the memory usage of the affected TaskManager node.

    image.png