This topic describes how to optimize the performance of the Flink SQL code in terms of the syntax, configurations, and functions.

Optimize the Group By functions

  • Enable microBatch or miniBatch to improve the throughput

    The microBatch and miniBatch policies are both used for micro-batch processing, specifically, processing multiple records in a task instead of running separate tasks for each record. When either of the policy is enabled, Realtime Compute processes data when the data cache meets the trigger condition. This reduces the frequency that Realtime Compute accesses the state data, and thus improves the throughput and reduces data output.

    The microBatch and miniBatch policies are different from each other in terms of the trigger. The miniBatch policy triggers micro-batch processing by using the timer threads that are registered with each task. This involves some thread scheduling overheads. As an upgraded version of the miniBatch policy, the microBatch policy triggers micro-batch processing by using event messages, which are inserted to the data sources at a specific interval. The microBatch policy outperforms the miniBatch policy in that it provides higher data serialization efficiency, reduces back pressure, and achieves higher throughput and lower latency.

    • Scenarios
      Micro-batch processing achieves higher throughput at the expense of higher latency. It does not apply to scenarios that require extremely low latency. However, in data aggregation scenarios, we recommend that you enable micro-batch processing to improve the job performance.
      Note By enabling microBatch, you can also resolve the pain point of data jitter when data is aggregated in two phases.
    • Enabling method
      By default, microBatch and miniBatch are disabled. To enable them, set the following parameters accordingly:
      # Enable window miniBatch in Realtime Compute V3.2 and later. (By default, window miniBatch is disabled in Realtime Compute V3.2 and later.)
      sql.exec.mini-batch.window.enabled=true
      # The interval for generating batch data. You must specify this parameter when you enable microBatch. We recommend that you set this parameter to the same value as that of blink.miniBatch.allowLatencyMs.
      blink.microBatch.allowLatencyMs=5000
      # When you enable microBatch, you must reserve the following two miniBatch settings:
      blink.miniBatch.allowLatencyMs=5000
      # The maximum number of data records that can be cached for each batch. You must set this parameter to avoid the out of memory (OOM) error.
      blink.miniBatch.size=20000
  • Enable LocalGlobal to resolve common data hotspot issues

    The LocalGlobal policy divides the aggregation process to two phases: local aggregation and global aggregation. This is similar to the combine and reduce phases in MapReduce. In the local aggregation phase, Realtime Compute aggregates a micro batch of data locally at each input node (localAgg), and generates the accumulator value for each batch (Accumulator). In the global aggregation phase, Realtime Compute merges the accumulator values (merge) to obtain the final result (globalAgg).

    The LocalGlobal policy can eliminate data skew through local aggregation and resolve the data hotspot issues in global aggregation. As a result, the performance is enhanced. The following figure shows how the LocalGlobal policy resolves the data skew issue.
    • Scenarios

      You can enable LocalGlobal to improve the performance of general aggregation functions, such as SUM, COUNT, MAX, MIN, and AVG, and resolve data hotspot issues when you run these functions.
      Note To enable LocalGlobal, you must define a user-defined aggregation function (UDAF) to implement the merge method.
    • Enabling method

      By default, LocalGlobal is enabled in Realtime Compute V2.0 and later. When the blink.localAgg.enabled parameter is set to true, LocalGlobal is enabled. This parameter takes effect only when microBatch or miniBatch is enabled.
    • Verification

      To verify whether LocalGlobal is enabled, check whether the GlobalGroupAggregate or LocalGroupAggregate node exists in the final topology.
  • Enable PartialFinal to resolve data hotspot issues when you run the CountDistinct function

    The LocalGlobal policy can effectively improve the performance of general aggregation functions, such as SUM, COUNT, MAX, MIN, and AVG. However, it is not effective for improving the performance of the CountDistinct function. This is because local aggregation cannot effectively remove duplicate distinct keys. As a result, a large amount of data is still stacked up in the global aggregation phase.

    In Realtime Compute earlier than V2.2.0, you must add a layer that scatters data by distinct key when you run the CountDistinct function so that you can divide the aggregation process to two phases to resolve data hotspot issues. Realtime Compute V2.2.0 and later provides the PartialFinal policy to automatically scatter data and divide the aggregation process. The following figure shows the difference between LocalGlobal and PartialFinal.
    • Scenarios

      The PartialFinal policy applies to scenarios where the aggregation performance cannot meet your requirements when you run the CountDistinct function.
      Note
      • You cannot enable PartialFinal in the Flink SQL code that contains UDAFs.
      • We recommend that you enable PartialFinal only when the data volume is large. This is because the PartialFinal feature automatically scatters data to two aggregation layers and introduces additional network shuffling. When the data volume is not large, it is a waste of resources.
    • Enabling method

      By default, PartialFinal is disabled. To enable PartialFinal, set the blink.partialAgg.enabled parameter to true.
    • Verification

      To verify whether PartialFinal is enabled, check whether expandable nodes exist in the final topology, or whether the number of aggregation layers changes from one to two.
  • Use the agg with filter syntax to improve the job performance when you run the CountDistinct function
    Note This method is supported by Realtime Compute V2.2.2 and later.
    Statistical jobs record unique visitors (UVs) in different dimensions, such as UVs of all channels, UVs of mobile clients, and UVs of PC clients. We recommend that you use the standard agg with filter syntax instead of the agg with case when syntax to implement multi-dimensional statistical analysis. The reason is that the SQL optimizer of Realtime Compute can analyze the filter parameter. In this way, Realtime Compute can run the CountDistinct function on the same field with different filter conditions by sharing the state data. This reduces the read and write operations on the state data. The performance test indicates that compared with the agg with case when syntax, the agg with filter syntax improves the job performance by one time.
    • Scenarios

      We recommend that you replace the agg with case when syntax with the agg with filter syntax. This particularly improves the job performance when you run the CountDistinct function on the same field with different filter conditions.
    • Original statement
      COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2
    • Optimized statement
      COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2

Optimize the TopN algorithm

  • TopN algorithm
    When the input streams of TopN are static streams (such as source), TopN supports only one algorithm: AppendRank. When the input streams of TopN are dynamic streams (such as streams that are processed by the aggregate or join function), TopN supports three algorithms: UpdateFastRank, UnaryUpdateRank, and RetractRank (in the descending order of performance). The name of the adopted algorithm is contained in the node name in the topology.
    • UpdateFastRank is the optimal algorithm. The following two conditions must be met if you want to use this algorithm: 1. The input streams must contain the primary key information. 2. The values of the fields or functions in the ORDER BY clause are updated monotonically in the opposite order of sorting. For example, you can define the ORDER BY clause as follows: ORDER BY COUNT DESC, ORDER BY COUNT DISTINCT DESC, or ORDER BY SUM(positive) DESC.
      Note For ORDER BY SUM(positive) DESC, you must specify a condition for filtering positive values. This filter condition informs the optimizer that the value of the SUM function is positive. Then, you can use the UpdateFastRank algorithm. This algorithm is supported by Realtime Compute V2.2.2 and later. The sample code is as follows:
      SELECT cate_id, seller_id, stat_date, pay_ord_amt  # The rownum field is not included. This reduces the amount of output data to be written to the result table.
      FROM (SELECT*
            ROW_NUMBER ()OVER(PARTITIONBY cate_id, stat_date   # Ensure that the stat_date field is included. Otherwise, the data may be disordered when the state data expires.
      ORDER BY pay_ord_amt DESC)AS rownum## Sort data by the sum of the input data.
        FROM (SELECT cate_id, seller_id, stat_date, # Note: The values involved in the SUM function are positive, so the result of the SUM function is monotonically increasing. That's why TopN can apply optimization algorithms.
      sum(total_fee) filter (where total_fee >=0) as pay_ord_amt
          FROM WHERE total_fee >=0 GROUP BY cate_name, seller_id, stat_date)WHERE rownum <=100))
    • UnaryUpdateRank is second only to UpdateFastRank in terms of performance. To use this algorithm, make sure that the input streams contain the primary key information. For example, you can define the ORDER BY clause as follows: ORDER BY AVG.
    • RetractRank ranks last in terms of performance. We do not recommend that you use this algorithm in production. Check input streams. If input streams contain the primary key information, use UnaryUpdateRank or UpdateFastRank to optimize the job performance.
  • Optimization method
    • Exclude the rownum field

      Do not include rownum in the output of TopN. We recommend that you sort the results when they are finally displayed in the front end. This can significantly reduce the amount of data that is to be written to the result table. For more information, see TopN statement.
    • Increase the cache size of TopN

      TopN provides a state cache to improve the efficiency for accessing the state data. This improves the performance. The following formula is used to calculate the hit rate of TopN cache:
      cache_hit = cache_size × parallelism/top_n/partition_key_num
      Take Top100 as an example. Assume that the cache contains 10,000 records and the parallelism is 50. If the number of keys for the PARTITION BY function is 100,000, the cache hit rate equals 5% (10000 × 50/100/100000 = 5%). The hit rate is low, indicating that large amounts of requests access the disk state data. This significantly deteriorates the performance. Therefore, if the number of keys for the PARTITION BY function is large, you may increase the cache size and heap memory of TopN. For more information, see Optimize performance by manual configuration.
      ## In this example, if you increase the cache size of TopN from the default value 10,000 to 200,000, the cache hit rate may reach 100% (200000 × 50/100/100000 = 100%).
      blink.topn.cache.size=200000
    • Include a time field in the PARTITION BY function

      For example, you must include the day field in your statement for a daily ranking. Otherwise, the TopN result may become disordered when the state data expires.

Optimize the deduplication performance

Note Realtime Compute V3.2.1 and later supports efficient deduplication.
Input streams of Realtime Compute may contain duplicate data, making efficient deduplication a frequent demand. Realtime Compute offers two policies to efficiently remove duplicates: Deduplicate Keep FirstRow and Deduplicate Keep LastRow.
  • Syntax
    Flink SQL does not provide the syntax for deduplication. To reserve the first or last duplicate record under the specified primary key and discard the rest duplicate records as required, you must use the ROW_NUMBER() window function of SQL. In this sense, deduplication is a special TopN function.
    SELECT *
    FROM (
       SELECT *,
        ROW_NUMBER() OVER ([PARTITION BY col1[, col2..]
         ORDER BY timeAttributeCol [asc|desc]) AS rownum
       FROM table_name)
    WHERE rownum = 1
    Element Description
    ROW_NUMBER() Calculates the row number. It is a window function that contains an OVER clause. The value starts from 1.
    PARTITION BY col1[, col2..] Optional. Specifies partition columns for storing primary keys of duplicate records.
    ORDER BY timeAttributeCol [asc|desc]) Specifies the column for sorting records based on a Time attributes (proctime or rowtime). You can sort records in the ascending order (Deduplicate Keep FirstRow) or descending order (Deduplicate Keep LastRow) of the time attribute.
    rownum Specifies the number of rows. You can set this element as follows: rownum = 1 or rownum <= 1.
    According to the preceding syntax, deduplication includes two steps:
    1. Use the ROW_NUMBER() window function to sort data by the specified time attribute and mark the data with rankings.
      Note
      • If the time attribute is proctime, Realtime Compute removes duplicate records based on the time when the records are processed by Realtime Compute. In this case, the ranking may vary each time.
      • If the time attribute is rowtime, Realtime Compute removes duplicate records based on the time when the records are written to Realtime Compute. In this case, the ranking remains the same each time.
    2. Reserve the first record under the specified primary key and remove the rest duplicate records.
      Note You can sort records in the ascending or descending order of the time attribute.
      • Deduplicate Keep FirstRow: Realtime Compute sorts records in the ascending order of the time attribute and reserves the first record under the specified primary key.
      • Deduplicate Keep LastRow: Realtime Compute sorts records in the descending order of the time attribute and reserves the first record under the specified primary key.
  • Deduplicate Keep FirstRow
    If you select the Deduplicate Keep FirstRow policy, Realtime Compute reserves the first record under the specified primary key and discards the rest duplicate records. In this case, the state data stores only the primary key information, and the efficiency of accessing the state data is significantly increased. The sample code is as follows:
    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum
      FROM T
    )
    WHERE rowNum = 1
    Note The preceding code removes duplicate records in table T based on the b field, and reserves the first record under the specified primary key based on the system time. In this example, the proctime attribute indicates the system time when the records are processed in Realtime Compute. Realtime Compute sorts records in table T based on this attribute. To remove duplicate records based on the system time, you can also call the PROCTIME() function instead of declaring the proctime attribute.
  • Deduplicate Keep LastRow
    If you select the Deduplicate Keep LastRow policy, Realtime Compute reserves the last record under the specified primary key and discards the rest duplicate records. This policy slightly outperforms the LAST_VALUE function in terms of performance. The sample code of Deduplicate Keep LastRow is as follows:
    SELECT *
    FROM (
      SELECT *,
        ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum
      FROM T
    )
    WHERE rowNum = 1
    Note The preceding code removes duplicate records in table T based on the b and d fields, and reserves the last record under the specified primary key based on the time when the records are written to Realtime Compute. In this example, the rowtime attribute indicates the event time when the records are written to Realtime Compute. Realtime Compute sorts records in table T based on this attribute.

Use efficient built-in functions

  • Use built-in functions to replace user-defined extensions (UDXs)

    Built-in functions of Realtime Compute are under continual optimization. We recommend that you use built-in functions to replace UDXs whenever possible. Realtime Compute V2.0 optimizes built-in functions in the following aspects:
    • Improves the serialization and deserialization efficiency.
    • Allows operations in the byte level.
  • Use single-character delimiters in the KEYVALUE function

    The signature of the KEYVALUE function is KEYVALUE(content, keyValueSplit, keySplit, keyName). When keyValueSplit and KeySplit are single-character delimiters, such as a colon (:) or a comma (,), Realtime Compute uses an optimized algorithm. Instead of segmenting the entire content, Realtime Compute directly searches for the required KeyName values among the binary data. This improves the job performance by approximately 30%.
  • Use the MULTI_KEYVALUE function when multiple key-value pairs exist
    Note This method is supported by Realtime Compute V2.2.2 and later.
    A query may involve multiple KEYVALUE functions on the same content. Assume that a content contains 10 key-value pairs. To extract all these 10 values to use them as fields, you must write 10 KEYVALUE functions to parse the content 10 times. In this case, we recommend that you use the table-valued function MULTI_KEYVALUE, which requires only one SPLIT parsing on the content. This improves the job performance by 50% to 100%.
  • Use the LIKE operator with caution
    • To match records that start with the specified content, use LIKE 'xxx%'.
    • To match records that end with the specified content, use LIKE '%xxx'.
    • To match records that contain the specified content, use LIKE '%xxx%'.
    • To match records that are the same as the specified content, use LIKE 'xxx', which is equal to str = 'xxx'.
    • To match an underscore (_), use LIKE '%seller/id%' ESCAPE '/. The underscore (_) is escaped because it is a single-character wildcard in SQL and can match any characters. If you use LIKE '%seller_id%', a lot of results with be returned, such as seller_id, seller#id, sellerxid, and seller1id. The results may be unsatisfactory.
  • Avoid using regular expressions

    Running regular expressions can be time consuming, and may require a hundred more times of computing resources in comparison with other operations such as plus, minus, multiplication, and division. If you run regular expressions under some particular circumstances, your job may be stuck in an infinite loop. Therefore, use the LIKE operator whenever possible. For more information about some common regular expressions, click the corresponding link:

Optimize network transmission

Common partitioner policies include:
  • KeyGroup/Hash: distributes data based on specified keys.
  • Rebalance: distributes data to each channel through round-robin scheduling.
  • Dynamic-Rebalance: dynamically distributes data to channels with lower load based on the load status of output channels.
  • Forward: similar to Rebalance when processes are unchained. When keys and channels are chained, Realtime Compute distributes data under specified keys to the corresponding channels.
  • Rescale: distributes data in a one-to-many or many-to-one mode between input and output channels.
  • Use Dynamic-Rebalance to replace Rebalance

    When you use Dynamic-Rebalance, Realtime Compute writes data to subpartitions with lower load based on the amount of buffered data in each subpartition so that it can achieve dynamic load balancing. Compared with the static Rebalance policy, Dynamic-Rebalance can balance the load and improve the overall job performance when the computing capacity of output computing nodes is unbalanced. If you find the load of output nodes is unbalanced when you use Rebalance, you may consider to use Dynamic-Rebalance. To use Dynamic-Rebalance, set the task.dynamic.rebalance.enabled parameter to true. The default value is false.
  • Use Rescale to replace Rebalance

    Note Rescale is supported by Realtime Compute V2.2.2 and later.
    Assume that you have five parallel input nodes and 10 parallel output nodes. If you use Rebalance, each input node distributes data to all 10 output nodes through round-robin scheduling. If you use Rescale, each input node only needs to distribute data to two output nodes through round-robin scheduling. This reduces the number of channels, increases the buffering speed of each subpartition, and thus improves the network efficiency. When input data is even and the number of parallel input and output nodes are the same, you can use Rescale to replace Rebalance. To use Rescale, set the enable.rescale.shuffling parameter to true. The default value is false.

Recommended configuration

To sum up, we recommend that you use the following job configuration:
# Exactly-once semantics.
blink.checkpoint.mode=EXACTLY_ONCE
# The checkpoint interval, in milliseconds.
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# Realtime Compute V2.X uses Niagara as the state data backend, and uses it to set the lifecycle (in milliseconds) of the state data.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# Realtime Compute V2.X enables micro-batch processing with an interval of 5 seconds. (You cannot set this parameter when you use a window function.)
blink.microBatch.allowLatencyMs=5000
# The allowed latency for a job.
blink.miniBatch.allowLatencyMs=5000
# The size of a batch.
blink.miniBatch.size=20000
# Enable local aggregation. This feature is enabled by default in Realtime Compute V2.X, but you must manually enable it if you use Realtime Compute V1.6.4.
blink.localAgg.enabled=true
# Enable PartialFinal to resolve data hotspot issues when you run the CountDistinct function in Realtime Compute V2.X.
blink.partialAgg.enabled=true
# Enable UNION ALL for optimization.
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# Enable OBJECT REUSE for optimization.
#blink.object.reuse=true
# Configure garbage collection for optimization. (You cannot set this parameter when you use a Log Service source table.)
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# Set the time zone.
blink.job.timeZone=Asia/Shanghai