All Products
Search
Document Center

Realtime Compute for Apache Flink:Optimize Flink SQL

Last Updated:Dec 22, 2023

This topic describes how to improve the performance of a Flink SQL deployment by optimizing deployment configurations and Flink SQL logic.

Deployment configuration optimization

  • Optimize resource configurations

    Ververica Platform (VVP) imposes limits on the number of CPU cores that can be used in the JobManager and the number of CPU cores that can be used in each TaskManager. The maximum number of CPU cores that can be used equals the number of CPU cores that you configured. When you optimize resource configurations, we recommend that you perform the following operations:

    • If a large number of deployments are running in parallel, you can configure parameters to increase the number of CPU cores and memory size used by the JobManager in the Resources section of the Configuration tab. Example:

      • Set Job Manager CPU to 4.

      • Set Job Manager Memory to 8 GiB.

    • If the deployment topology is complex, you can configure parameters to increase the number of CPU cores and memory size used by a TaskManager in the Resources section of the Configuration tab. Example:

      • Set Task Manager CPU to 2.

      • Set Task Manager Memory to 4 GiB.

    • We recommend that you retain the default value of taskmanager.numberOfTaskSlots. The default value is 1.

  • Improve throughput and resolve data hotspot issues

    Add the following code to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see How do I configure parameters for deployment running? and Optimize group aggregate.

    execution.checkpointing.interval: 180s
    table.exec.state.ttl: 129600000
    table.exec.mini-batch.enabled: true
    table.exec.mini-batch.allow-latency: 5s
    table.optimizer.distinct-agg.split.enabled: true

    The following table describes the parameters.

    Parameter

    Description

    execution.checkpointing.interval

    The checkpoint interval, in milliseconds.

    state.backend

    The configuration of the state backend.

    table.exec.state.ttl

    The lifecycle of state data, in milliseconds.

    table.exec.mini-batch.enabled

    Specifies whether to enable miniBatch.

    table.exec.mini-batch.allow-latency

    The interval at which data is exported at a time.

    table.optimizer.distinct-agg.split.enabled

    Specifies whether to enable PartialFinal optimization to resolve data hotspot issues when you use the COUNT DISTINCT function.

  • Improve the performance of deployments in which JOIN operations for two data streams are performed

    JOIN operators that are used to join two data streams in SQL streaming deployments allow the Flink engine to automatically infer whether to enable the key-value separation feature. In Realtime Compute for Apache Flink that uses Ververica Runtime (VVR) 6.0.1 or later, the Flink engine can automatically infer whether to enable the key-value separation feature based on the characteristics of an SQL streaming deployment. No additional configuration is required. After the key-value separation feature is enabled, the performance of a deployment in which JOIN operations for two data streams are performed is significantly improved. The performance test results in typical scenarios show that the performance is improved by more than 40%.

    You can configure the table.exec.join.kv-separate parameter to specify whether to enable the key-value separation feature. Valid values:

    • AUTO: The Flink engine automatically enables the key-value separation feature based on the state of JOIN operators that are used to join two data streams. This is the default value.

    • FORCE: The Flink engine forcefully enables the key-value separation feature.

    • NONE: The Flink engine forcefully disables the key-value separation feature.

    Note

    The key-value separation feature takes effect only on GeminiStateBackend.

Recommended Flink SQL practices

Optimize group aggregate

  • Enable miniBatch to improve throughput

    If miniBatch is enabled, Realtime Compute for Apache Flink processes data when the data cache meets the trigger condition. This reduces the number of times that Realtime Compute for Apache Flink accesses the state data. This improves data throughput and reduces data output.

    The miniBatch feature triggers micro-batch processing based on event messages. The event messages are inserted at the source at a specified interval.

    • Scenarios

      Micro-batch processing achieves higher throughput at the expense of higher latency. Therefore, micro-batch processing does not apply to scenarios that require extremely low latency. However, in data aggregation scenarios, we recommend that you enable micro-batch processing to improve system performance.

    • Method to enable miniBatch

      The miniBatch feature is disabled by default. To enable this feature, you must add the following code to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see How do I configure parameters for deployment running?

      table.exec.mini-batch.enabled: true
      table.exec.mini-batch.allow-latency: 5s

      The following table describes the parameters.

      Parameter

      Description

      table.exec.mini-batch.enabled

      Specifies whether to enable miniBatch.

      table.exec.mini-batch.allow-latency

      The interval at which data is exported at a time.

  • Enable LocalGlobal to resolve common data hotspot issues

    The LocalGlobal policy can filter out some skewed data by using local aggregation and resolve the data hotspot issues in global aggregation. This improves deployment performance.

    The LocalGlobal policy divides the aggregation process into two phases: local aggregation and global aggregation. These phases are equivalent to the combine and reduce phases in MapReduce. In the local aggregation phase, Realtime Compute for Apache Flink aggregates a micro-batch of data that is locally cached at each upstream node, and generates the accumulator value for each micro-batch. In the global aggregation phrase, the accumulator is merged into the final result. Then, the global aggregation result is generated.

    • Scenarios

      The LocalGlobal policy is suitable for scenarios in which you want to improve deployment performance and resolve data hotspot issues by using common aggregate functions, such as SUM, COUNT, MAX, MIN, and AVG.

    • Limits

      LocalGlobal is enabled by default. The policy has the following limits:

      • It can take effect only when miniBatch is enabled.

      • AggregateFunction must be used to merge data.

    • Verification

      To determine whether LocalGlobal is enabled, check whether the GlobalGroupAggregate or LocalGroupAggregate node exists in the final topology.

  • Enable PartialFinal to resolve data hotspot issues when you use the COUNT DISTINCT function

    In normal cases, you need to add a layer that scatters data by distinct key when you use the COUNT DISTINCT function. This way, you can divide the aggregation process into two phases to resolve data hotspot issues. Realtime Compute for Apache Flink now provides the PartialFinal policy to automatically scatter data and divide the aggregation process.

    The LocalGlobal policy improves the performance of common aggregate functions, such as SUM, COUNT, MAX, MIN, and AVG. However, the LocalGlobal policy is less effective for improving the performance of the COUNT DISTINCT function. This is because local aggregation cannot remove duplicate distinct keys. As a result, a large amount of data is stacked up in the global aggregation phase.

    • Scenarios

      The PartialFinal policy is suitable for scenarios in which the aggregation performance cannot meet your requirements when you use the COUNT DISTINCT function.

      Note
      • You cannot enable PartialFinal in the Flink SQL code that contains user-defined aggregate functions (UDAFs).

      • To prevent resources from being wasted, we recommend that you enable PartialFinal only when the amount of data is large. PartialFinal automatically scatters data and divides the aggregation process into two phases. This causes additional network shuffling.

    • Method to enable PartialFinal

      PartialFinal is disabled by default. To enable this feature, you must add the following code to the Other Configuration field in the Parameters section of the Configuration tab. For more information, see How do I configure parameters for deployment running?

      table.optimizer.distinct-agg.split.enabled: true
    • Verification

      Check whether one-layer aggregation changes to two-layer aggregation in the final topology.

  • AGG WITH CASE WHEN changed to AGG WITH FILTER to improve system performance in scenarios in which the COUNT DISTINCT function is used when a large amount of data exists

    Statistical deployments record unique visitors (UVs) in different dimensions, such as UVs of all the channels, UVs of mobile terminals, and UVs of PCs. We recommend that you use the standard AGG WITH FILTER syntax instead of the AGG WITH CASE WHEN syntax to implement multidimensional statistical analysis. The reason is that the SQL optimizer of Realtime Compute for Apache Flink can analyze the filter parameter. This way, Realtime Compute for Apache Flink can execute the COUNT DISTINCT function on the same field in different filter conditions by sharing the state data. This reduces the read and write operations on the state data. In the performance test, the AGG WITH FILTER syntax outperforms the AGG WITH CASE WHEN syntax. This is because the deployment performance for the AGG WITH FILTER syntax doubles that for the AGG WITH CASE WHEN syntax.

    • Scenarios

      If you use AGG WITH FILTER instead of AGG WITH CASE WHEN when you use the COUNT DISTINCT function to calculate the results under different conditions on the same field, deployment performance significantly improves.

    • Original statement

      COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2
    • Optimized statement

      COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2

TopN practices

  • TopN algorithms

    If the input data streams of TopN are static data streams, such as data streams from Log Service data sources, TopN supports only the AppendRank algorithm. If the input data streams of TopN are dynamic data streams, such as data streams processed by aggregate or join functions, TopN supports the UpdateFastRank and RetractRank algorithms. The performance of UpdateFastRank is better than that of RetractRank. The name of the algorithm in use is included in the name of a node in the topology.

    • AppendRank: Only this algorithm is supported for static data streams.

    • UpdateFastRank: This algorithm is optimal for dynamic data streams.

    • RetractRank: This algorithm is a basic algorithm for dynamic data streams. If the performance of this algorithm does not meet your business requirements, you can change this algorithm to UpdateFastRank in specific scenarios.

    The following section describes how to change RetractRank to UpdateFastRank. If you want to use the UpdateFastRank algorithm, make sure that the following conditions are met:

    • The input data streams are dynamic data streams but do not contain a DELETE or UPDATE_BEFORE message. If the input data streams contain a DELETE or UPDATE_BEFORE message, the monotonicity of the sorting fields is affected. You can execute the EXPLAIN CHANGELOG_MODE <query_statement_or_insert_statement_or_statement_set> statement to query the types of messages that are contained in the input data streams of the related node. For more information about the syntax of the EXPLAIN statement, see EXPLAIN.

    • The input data streams contain the primary key information. For example, the GROUP BY clause is used to aggregate columns based on the primary key in the source.

    • The values of the fields or functions, such as ORDER BY COUNT, COUNT_DISTINCT, or SUM (positive value) DESC in the ORDER BY clause, are monotonically updated in the opposite order of sorting.

    If you use ORDER BY SUM DESC to obtain the optimization plan of UpdateFastRank, you must specify a condition to obtain positive SUM values. This ensures that the value of the total_fee field is positive.

    Note

    In the following sample code, the random_test table contains static data streams. The aggregation result of the related group does not contain a DELETE or UPDATE_BEFORE message. Therefore, the monotonicity is retained for the related aggregation result field.

    Sample code of changing RetractRank to UpdateFastRank:

    insert
      into print_test
    SELECT
      cate_id,
      seller_id,
      stat_date,
      pay_ord_amt-- The command output does not contain the rownum field. This can reduce the output of data in the result table. 
    FROM (
        SELECT
          *,
          ROW_NUMBER () OVER (        
     -- Note: The PARTITION BY column must be included in the GROUP BY clause in the subquery. The time field must also be included. Otherwise, the data becomes disordered when the state expires. 
            PARTITION BY cate_id,
            stat_date  
            ORDER
              BY pay_ord_amt DESC
          ) as rownum -- Sort data based on the sum of the input data. 
        FROM (
            SELECT
              cate_id,
              seller_id,
              stat_date,
              -- Note: The result of the SUM function monotonically increases because the values in the SUM function are positive. Therefore, you can use the UpdateFast algorithm of TopN to obtain top 100 data records. 
              sum (total_fee) filter (
                where
                  total_fee >= 0
              ) as pay_ord_amt
            FROM
              random_test
            WHERE
              total_fee >= 0
            GROUP
              BY seller_id,
              stat_date,
              cate_id
          ) a
        ) WHERE
          rownum <= 100;
  • TopN optimization methods

    • Perform no-ranking optimization

      Do not include rownum in the output of TopN. We recommend that you sort the results when they are finally displayed in the frontend. This significantly reduces the amount of data that is to be written to the result table. For more information about no-ranking optimization methods, see Top-N.

    • Increase the cache size of TopN

      TopN provides a state cache to improve the efficiency of accessing the state data. The following formula is used to calculate the cache hit ratio of TopN:

      cache_hit = cache_size*parallelism/top_n/partition_key_num

      For example, Top100 is used, the cache contains 10,000 records, and the parallelism is 50. If the number of keys for the PARTITION BY function is 100,000, the cache hit ratio is 5%. This ratio is calculated by using the formula: 10000 × 50/100/100,000 = 5%. The cache hit ratio is low, which indicates that a large number of requests access the disk state data and the values of the state seek metric may not be stable. In this case, the performance significantly decreases.

      Therefore, if the number of keys for the PARTITION BY function is large, you can increase the cache size and heap memory of TopN. For more information, see Configure a deployment.

      table.exec.rank.topn-cache-size: 200000

      In this example, if you increase the cache size of TopN from the default value 10,000 to 200,000, the cache hit ratio may reach 100%. This cache hit ratio is calculated by using the following formula: 200,000 × 50/100/100,000 = 100%.

    • Include a time field in the PARTITION BY function

      For example, add the Day field to the ranking each day. Otherwise, the TopN results are disordered due to the time-to-live (TTL) of state data.

Efficient deduplication

Input streams of Realtime Compute for Apache Flink may contain duplicate data, which makes efficient deduplication a frequent demand. Realtime Compute for Apache Flink offers two policies to remove duplicates: Deduplicate Keep FirstRow and Deduplicate Keep LastRow.

  • Syntax

    Flink SQL does not provide the syntax to remove duplicates. To reserve the record in the first or last row of duplicate rows under the specified primary key and discard the other duplicates, you must use the SQL ROW_NUMBER() function together with an OVER clause. Deduplication is a special TopN function.

    SELECT *
    FROM (
       SELECT *,
        ROW_NUMBER() OVER (PARTITION BY col1[, col2..]
         ORDER BY timeAttributeCol [asc|desc]) AS rownum
       FROM table_name)
    WHERE rownum = 1

    Parameter

    Description

    ROW_NUMBER()

    Calculates the row number. It is a window function that is used with an OVER clause. The value starts from 1.

    PARTITION BY col1[, col2..]

    Optional. Specifies partition columns for storing primary keys of duplicates.

    ORDER BY timeAttributeCol [asc|desc])

    Specifies the column by which you want to sort data. You must specify a time attribute, which can be proctime or rowtime. You can sort rows in ascending order or descending order based on the time attribute. For the ascending order, the record in the first row of duplicate rows is reserved. For the descending order, the record in the last row of duplicate rows is reserved.

    rownum

    Specifies the number of rows. You can configure rownum = 1 or rownum <= 1.

    The preceding syntax shows that deduplication involves two-level queries:

    1. Use the ROW_NUMBER() window function to sort data based on the specified time attribute and use rankings to mark the data.

      • If the time attribute is proctime, Realtime Compute for Apache Flink removes duplicates based on the time when the records are processed. In this case, the ranking result may vary each time.

      • If the time attribute is rowtime, Realtime Compute for Apache Flink removes duplicates based on the time when the records are written to Realtime Compute for Apache Flink. In this case, the ranking result remains the same each time.

    2. Reserve only the record in the first row under the specified primary key and remove the other duplicates.

      You can sort records in ascending or descending order based on the time attribute.

      • Deduplicate Keep FirstRow: Realtime Compute for Apache Flink sorts records in rows in ascending order based on the time attribute and reserves the record in the first row of duplicate rows under the specified primary key.

      • Deduplicate Keep LastRow: Realtime Compute for Apache Flink sorts records in rows in descending order based on the time attribute and reserves the record in the first row of duplicate rows under the specified primary key.

  • Deduplicate Keep FirstRow

    If you select the Deduplicate Keep FirstRow policy, Realtime Compute for Apache Flink reserves the record in the first row of duplicate rows under the specified primary key and discards the other duplicates. In this case, the state data stores only the primary key information, and the efficiency of accessing the state data is significantly increased. The following example is used to explain the policy.

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
      FROM T
    )
    WHERE rowNum = 1

    The preceding code removes duplicates from table T based on the b field, and reserves the record in the first row of duplicate rows under the specified primary key based on the system time. In this example, the proctime attribute indicates the system time when the records are processed in Realtime Compute for Apache Flink. Realtime Compute for Apache Flink sorts records in table T based on this attribute. To remove duplicates based on the system time, you can also call the PROCTIME() function instead of declaring the proctime attribute.

  • Deduplicate Keep LastRow

    If you select the Deduplicate Keep LastRow policy, Realtime Compute for Apache Flink reserves the record in the last row of duplicate rows under the specified primary key and discards the other duplicates. This policy slightly outperforms the LAST_VALUE function in terms of performance. The following example is used to explain the policy.

    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
      FROM T
    )
    WHERE rowNum = 1

    The preceding code removes duplicates from table T based on the b and d fields, and reserves the record in the last row under the specified primary key based on the time when the records are written to Realtime Compute for Apache Flink. In this example, the rowtime attribute indicates the event time when the records are written to Realtime Compute for Apache Flink. Realtime Compute for Apache Flink sorts records in table T based on this attribute.

Efficient built-in functions

If you use built-in functions, take note of the following points:

  • Use built-in functions to replace user-defined functions (UDFs)

    Built-in functions of Realtime Compute for Apache Flink are under continual optimization. We recommend that you replace UDFs with built-in functions if possible. Realtime Compute for Apache Flink performs the following operations to optimize built-in functions:

    • Improves the serialization and deserialization efficiency.

    • Allows operations on data in bytes.

  • Use single-character delimiters in the KEYVALUE function

    The signature of the KEYVALUE function is KEYVALUE(content, keyValueSplit, keySplit, keyName). If keyValueSplit and KeySplit are single-character delimiters, such as colons (:) or commas (,), Realtime Compute for Apache Flink uses an optimized algorithm. Realtime Compute for Apache Flink searches for the required KeyName values in the binary data and does not segment the entire content. The deployment performance increases by about 30%.

  • Take note of the following points when you use the LIKE operator:

    • To match records that start with the specified content, use LIKE 'xxx%'.

    • To match records that end with the specified content, use LIKE '%xxx'.

    • To match records that contain the specified content, use LIKE '%xxx%'.

    • To match records that are the same as the specified content, use LIKE 'xxx', which is equal to str = 'xxx'.

    • To match an underscore (_), use LIKE '%seller/_id%' ESCAPE '/'. The underscore (_) is escaped because it is a single-character wildcard in SQL and can match all characters. If you use LIKE '%seller_id%', a large number of results are returned, such as seller_id, iseller#id, sellerxid, and seller1id. This may cause unexpected results.

  • Avoid using regular expressions

    Data processing by using regular expressions is time-consuming, which may cause performance overhead a hundred times higher than the addition, subtraction, multiplication, or division operation. In addition, regular expressions may enter an infinite loop in some extreme cases. As a result, jobs may be blocked. For more information, see Regex execution is too slow. To prevent the deployment blocking issue, we recommend that you use the LIKE operator. For more information about common regular expressions, click the following links:

SQL hints

Flink supports SQL hints to improve the optimization capability of the engine. SQL hints are suitable for the following scenarios:

  • Modification of the execution plan: You can use SQL hints to better manage the execution plan.

  • Addition of metadata or statistical data: Specific statistical data, such as scanned table indexes and skew information about specific shuffle keys, is dynamic for queries. The planning metadata that is obtained from the planner may be inaccurate. You can use SQL hints to configure the metadata or statistical data.

  • Configuration of Dynamic Table Options: Dynamic table options allow you to dynamically specify or override table options. These options can be specified in per-table scope within each query.

Query hints are a type of SQL hints that are used to suggest the optimizer to modify the execution plan. The modification takes effect only in the query block in which the current query hint is located. Flink query hints support only join hints.

  • Syntax

    The syntax of Flink query hints is the same as the syntax of Apache Calcite SQL.

    # Query hints:
    SELECT /*+ hint [, hint ] */ ...
    
    hint:
            hintName '(' hintOption [, hintOption ]* ')'
    
    hintOption:
            simpleIdentifier
        |   numericLiteral
        |   stringLiteral
  • Join hints

    Join hints are a type of query hints that allow you to dynamically optimize joins. Join hints for dimension tables and hints for regular joins are supported.