This topic describes how to optimize the performance of the Flink SQL code in terms of the syntax, configurations, and functions.
Optimize the Group By functions
- Enable microBatch or miniBatch to improve the throughput
The microBatch and miniBatch policies are both used for micro-batch processing, specifically, processing multiple records in a task instead of running separate tasks for each record. When either of the policy is enabled, Realtime Compute processes data when the data cache meets the trigger condition. This reduces the frequency that Realtime Compute accesses the state data, and thus improves the throughput and reduces data output.
The microBatch and miniBatch policies are different from each other in terms of the trigger. The miniBatch policy triggers micro-batch processing by using the timer threads that are registered with each task. This involves some thread scheduling overheads. As an upgraded version of the miniBatch policy, the microBatch policy triggers micro-batch processing by using event messages, which are inserted to the data sources at a specific interval. The microBatch policy outperforms the miniBatch policy in that it provides higher data serialization efficiency, reduces back pressure, and achieves higher throughput and lower latency.
Micro-batch processing achieves higher throughput at the expense of higher latency. It does not apply to scenarios that require extremely low latency. However, in data aggregation scenarios, we recommend that you enable micro-batch processing to improve the job performance.Note By enabling microBatch, you can also resolve the pain point of data jitter when data is aggregated in two phases.
- Enabling method
By default, microBatch and miniBatch are disabled. To enable them, set the following parameters accordingly:
# Enable window miniBatch in Realtime Compute V3.2 and later. (By default, window miniBatch is disabled in Realtime Compute V3.2 and later.) sql.exec.mini-batch.window.enabled=true # The interval for generating batch data. You must specify this parameter when you enable microBatch. We recommend that you set this parameter to the same value as that of blink.miniBatch.allowLatencyMs. blink.microBatch.allowLatencyMs=5000 # When you enable microBatch, you must reserve the following two miniBatch settings: blink.miniBatch.allowLatencyMs=5000 # The maximum number of data records that can be cached for each batch. You must set this parameter to avoid the out of memory (OOM) error. blink.miniBatch.size=20000
- Enable LocalGlobal to resolve common data hotspot issues
The LocalGlobal policy divides the aggregation process to two phases: local aggregation and global aggregation. This is similar to the combine and reduce phases in MapReduce. In the local aggregation phase, Realtime Compute aggregates a micro batch of data locally at each input node (localAgg), and generates the accumulator value for each batch (Accumulator). In the global aggregation phase, Realtime Compute merges the accumulator values (merge) to obtain the final result (globalAgg).The LocalGlobal policy can eliminate data skew through local aggregation and resolve the data hotspot issues in global aggregation. As a result, the performance is enhanced. The following figure shows how the LocalGlobal policy resolves the data skew issue.
ScenariosYou can enable LocalGlobal to improve the performance of general aggregation functions, such as SUM, COUNT, MAX, MIN, and AVG, and resolve data hotspot issues when you run these functions.Note To enable LocalGlobal, you must define a user-defined aggregation function (UDAF) to implement the
Enabling methodBy default, LocalGlobal is enabled in Realtime Compute
V2.0and later. When the
blink.localAgg.enabledparameter is set to true, LocalGlobal is enabled. This parameter takes effect only when microBatch or miniBatch is enabled.
VerificationTo verify whether LocalGlobal is enabled, check whether the GlobalGroupAggregate or LocalGroupAggregate node exists in the final topology.
- Enable PartialFinal to resolve data hotspot issues when you run the CountDistinct
The LocalGlobal policy can effectively improve the performance of general aggregation functions, such as SUM, COUNT, MAX, MIN, and AVG. However, it is not effective for improving the performance of the CountDistinct function. This is because local aggregation cannot effectively remove duplicate distinct keys. As a result, a large amount of data is still stacked up in the global aggregation phase.In Realtime Compute earlier than V2.2.0, you must add a layer that scatters data by distinct key when you run the CountDistinct function so that you can divide the aggregation process to two phases to resolve data hotspot issues. Realtime Compute
V2.2.0and later provides the PartialFinal policy to automatically scatter data and divide the aggregation process. The following figure shows the difference between LocalGlobal and PartialFinal.
ScenariosThe PartialFinal policy applies to scenarios where the aggregation performance cannot meet your requirements when you run the CountDistinct function.Note
- You cannot enable PartialFinal in the Flink SQL code that contains UDAFs.
- We recommend that you enable PartialFinal only when the data volume is large. This is because the PartialFinal feature automatically scatters data to two aggregation layers and introduces additional network shuffling. When the data volume is not large, it is a waste of resources.
Enabling methodBy default, PartialFinal is disabled. To enable PartialFinal, set the
blink.partialAgg.enabledparameter to true.
VerificationTo verify whether PartialFinal is enabled, check whether expandable nodes exist in the final topology, or whether the number of aggregation layers changes from one to two.
- Use the agg with filter syntax to improve the job performance when you run the CountDistinct
Note This method is supported by Realtime Compute
ScenariosWe recommend that you replace the agg with case when syntax with the agg with filter syntax. This particularly improves the job performance when you run the CountDistinct function on the same field with different filter conditions.
- Original statement
COUNT(distinct visitor_id) as UV1 , COUNT(distinct case when is_wireless='y' then visitor_id else null end) as UV2
- Optimized statement
COUNT(distinct visitor_id) as UV1 , COUNT(distinct visitor_id) filter (where is_wireless='y') as UV2
Optimize the TopN algorithm
- TopN algorithm
When the input streams of TopN are static streams (such as source), TopN supports only one algorithm: AppendRank. When the input streams of TopN are dynamic streams (such as streams that are processed by the aggregate or join function), TopN supports three algorithms: UpdateFastRank, UnaryUpdateRank, and RetractRank (in the descending order of performance). The name of the adopted algorithm is contained in the node name in the topology.
- UpdateFastRank is the optimal algorithm. The following two conditions must be met
if you want to use this algorithm: 1. The input streams must contain the primary key
information. 2. The values of the fields or functions in the ORDER BY clause are updated
monotonically in the opposite order of sorting. For example, you can define the ORDER
BY clause as follows: ORDER BY COUNT DESC, ORDER BY COUNT DISTINCT DESC, or ORDER
BY SUM(positive) DESC.
Note For ORDER BY SUM(positive) DESC, you must specify a condition for filtering positive values. This filter condition informs the optimizer that the value of the SUM function is positive. Then, you can use the UpdateFastRank algorithm. This algorithm is supported by Realtime Compute
V2.2.2and later. The sample code is as follows:
SELECT cate_id, seller_id, stat_date, pay_ord_amt # The rownum field is not included. This reduces the amount of output data to be written to the result table. FROM (SELECT* ROW_NUMBER ()OVER(PARTITIONBY cate_id, stat_date # Ensure that the stat_date field is included. Otherwise, the data may be disordered when the state data expires. ORDER BY pay_ord_amt DESC)AS rownum## Sort data by the sum of the input data. FROM (SELECT cate_id, seller_id, stat_date, # Note: The values involved in the SUM function are positive, so the result of the SUM function is monotonically increasing. That's why TopN can apply optimization algorithms. sum(total_fee) filter (where total_fee >=0) as pay_ord_amt FROM WHERE total_fee >=0 GROUP BY cate_name, seller_id, stat_date)WHERE rownum <=100))
- UnaryUpdateRank is second only to UpdateFastRank in terms of performance. To use this algorithm, make sure that the input streams contain the primary key information. For example, you can define the ORDER BY clause as follows: ORDER BY AVG.
- RetractRank ranks last in terms of performance. We do not recommend that you use this algorithm in production. Check input streams. If input streams contain the primary key information, use UnaryUpdateRank or UpdateFastRank to optimize the job performance.
- UpdateFastRank is the optimal algorithm. The following two conditions must be met if you want to use this algorithm: 1. The input streams must contain the primary key information. 2. The values of the fields or functions in the ORDER BY clause are updated monotonically in the opposite order of sorting. For example, you can define the ORDER BY clause as follows: ORDER BY COUNT DESC, ORDER BY COUNT DISTINCT DESC, or ORDER BY SUM(positive) DESC.
- Optimization method
Exclude the rownum fieldDo not include rownum in the output of TopN. We recommend that you sort the results when they are finally displayed in the front end. This can significantly reduce the amount of data that is to be written to the result table. For more information, see TopN statement.
Increase the cache size of TopNTopN provides a state cache to improve the efficiency for accessing the state data. This improves the performance. The following formula is used to calculate the hit rate of TopN cache:
Take Top100 as an example. Assume that the cache contains 10,000 records and the parallelism is 50. If the number of keys for the PARTITION BY function is 100,000, the cache hit rate equals 5% (10000 × 50/100/100000 = 5%). The hit rate is low, indicating that large amounts of requests access the disk state data. This significantly deteriorates the performance. Therefore, if the number of keys for the PARTITION BY function is large, you may increase the cache size and heap memory of TopN. For more information, see Optimize performance by manual configuration.
cache_hit = cache_size × parallelism/top_n/partition_key_num
## In this example, if you increase the cache size of TopN from the default value 10,000 to 200,000, the cache hit rate may reach 100% (200000 × 50/100/100000 = 100%). blink.topn.cache.size=200000
Include a time field in the PARTITION BY functionFor example, you must include the day field in your statement for a daily ranking. Otherwise, the TopN result may become disordered when the state data expires.
Optimize the deduplication performance
Flink SQL does not provide the syntax for deduplication. To reserve the first or last duplicate record under the specified primary key and discard the rest duplicate records as required, you must use the ROW_NUMBER() window function of SQL. In this sense, deduplication is a special TopN function.
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER ([PARTITION BY col1[, col2..] ORDER BY timeAttributeCol [asc|desc]) AS rownum FROM table_name) WHERE rownum = 1
Element Description ROW_NUMBER() Calculates the row number. It is a window function that contains an OVER clause. The value starts from 1. PARTITION BY col1[, col2..] Optional. Specifies partition columns for storing primary keys of duplicate records. ORDER BY timeAttributeCol [asc|desc]) Specifies the column for sorting records based on a Time attributes (proctime or rowtime). You can sort records in the ascending order (Deduplicate Keep FirstRow) or descending order (Deduplicate Keep LastRow) of the time attribute. rownum Specifies the number of rows. You can set this element as follows:
rownum = 1or
rownum <= 1.
- Use the
ROW_NUMBER()window function to sort data by the specified time attribute and mark the data with rankings.Note
- If the time attribute is proctime, Realtime Compute removes duplicate records based on the time when the records are processed by Realtime Compute. In this case, the ranking may vary each time.
- If the time attribute is rowtime, Realtime Compute removes duplicate records based on the time when the records are written to Realtime Compute. In this case, the ranking remains the same each time.
- Reserve the first record under the specified primary key and remove the rest duplicate
Note You can sort records in the ascending or descending order of the time attribute.
- Deduplicate Keep FirstRow: Realtime Compute sorts records in the ascending order of the time attribute and reserves the first record under the specified primary key.
- Deduplicate Keep LastRow: Realtime Compute sorts records in the descending order of the time attribute and reserves the first record under the specified primary key.
- Use the
- Deduplicate Keep FirstRow
If you select the Deduplicate Keep FirstRow policy, Realtime Compute reserves the first record under the specified primary key and discards the rest duplicate records. In this case, the state data stores only the primary key information, and the efficiency of accessing the state data is significantly increased. The sample code is as follows:
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b ORDER BY proctime) as rowNum FROM T ) WHERE rowNum = 1Note The preceding code removes duplicate records in table T based on the b field, and reserves the first record under the specified primary key based on the system time. In this example, the proctime attribute indicates the system time when the records are processed in Realtime Compute. Realtime Compute sorts records in table T based on this attribute. To remove duplicate records based on the system time, you can also call the
PROCTIME()function instead of declaring the proctime attribute.
- Deduplicate Keep LastRow
If you select the Deduplicate Keep LastRow policy, Realtime Compute reserves the last record under the specified primary key and discards the rest duplicate records. This policy slightly outperforms the LAST_VALUE function in terms of performance. The sample code of Deduplicate Keep LastRow is as follows:
SELECT * FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY b, d ORDER BY rowtime DESC) as rowNum FROM T ) WHERE rowNum = 1Note The preceding code removes duplicate records in table T based on the b and d fields, and reserves the last record under the specified primary key based on the time when the records are written to Realtime Compute. In this example, the rowtime attribute indicates the event time when the records are written to Realtime Compute. Realtime Compute sorts records in table T based on this attribute.
Use efficient built-in functions
Use built-in functions to replace user-defined extensions (UDXs)Built-in functions of Realtime Compute are under continual optimization. We recommend that you use built-in functions to replace UDXs whenever possible. Realtime Compute V2.0 optimizes built-in functions in the following aspects:
- Improves the serialization and deserialization efficiency.
- Allows operations in the byte level.
Use single-character delimiters in the KEYVALUE functionThe signature of the KEYVALUE function is
KEYVALUE(content, keyValueSplit, keySplit, keyName). When keyValueSplit and KeySplit are single-character delimiters, such as a colon (
:) or a comma (
,), Realtime Compute uses an optimized algorithm. Instead of segmenting the entire content, Realtime Compute directly searches for the required KeyName values among the binary data. This improves the job performance by approximately 30%.
- Use the MULTI_KEYVALUE function when multiple key-value pairs exist
Note This method is supported by Realtime Compute
- Use the LIKE operator with caution
- To match records that start with the specified content, use
- To match records that end with the specified content, use
- To match records that contain the specified content, use
- To match records that are the same as the specified content, use
LIKE 'xxx', which is equal to
str = 'xxx'.
- To match an underscore (
LIKE '%seller/id%' ESCAPE '/. The underscore (
_) is escaped because it is a single-character wildcard in SQL and can match any characters. If you use
LIKE '%seller_id%', a lot of results with be returned, such as
seller1id. The results may be unsatisfactory.
- To match records that start with the specified content, use
Avoid using regular expressionsRunning regular expressions can be time consuming, and may require a hundred more times of computing resources in comparison with other operations such as plus, minus, multiplication, and division. If you run regular expressions under some particular circumstances, your job may be stuck in an infinite loop. Therefore, use the LIKE operator whenever possible. For more information about some common regular expressions, click the corresponding link:
Optimize network transmission
- KeyGroup/Hash: distributes data based on specified keys.
- Rebalance: distributes data to each channel through round-robin scheduling.
- Dynamic-Rebalance: dynamically distributes data to channels with lower load based on the load status of output channels.
- Forward: similar to Rebalance when processes are unchained. When keys and channels are chained, Realtime Compute distributes data under specified keys to the corresponding channels.
- Rescale: distributes data in a one-to-many or many-to-one mode between input and output channels.
Use Dynamic-Rebalance to replace RebalanceWhen you use Dynamic-Rebalance, Realtime Compute writes data to subpartitions with lower load based on the amount of buffered data in each subpartition so that it can achieve dynamic load balancing. Compared with the static Rebalance policy, Dynamic-Rebalance can balance the load and improve the overall job performance when the computing capacity of output computing nodes is unbalanced. If you find the load of output nodes is unbalanced when you use Rebalance, you may consider to use Dynamic-Rebalance. To use Dynamic-Rebalance, set the
task.dynamic.rebalance.enabledparameter to true. The default value is false.
Use Rescale to replace RebalanceNote Rescale is supported by Realtime Compute
enable.rescale.shufflingparameter to true. The default value is false.
# Exactly-once semantics. blink.checkpoint.mode=EXACTLY_ONCE # The checkpoint interval, in milliseconds. blink.checkpoint.interval.ms=180000 blink.checkpoint.timeout.ms=600000 # Realtime Compute V2.X uses Niagara as the state data backend, and uses it to set the lifecycle (in milliseconds) of the state data. state.backend.type=niagara state.backend.niagara.ttl.ms=129600000 # Realtime Compute V2.X enables micro-batch processing with an interval of 5 seconds. (You cannot set this parameter when you use a window function.) blink.microBatch.allowLatencyMs=5000 # The allowed latency for a job. blink.miniBatch.allowLatencyMs=5000 # The size of a batch. blink.miniBatch.size=20000 # Enable local aggregation. This feature is enabled by default in Realtime Compute V2.X, but you must manually enable it if you use Realtime Compute V1.6.4. blink.localAgg.enabled=true # Enable PartialFinal to resolve data hotspot issues when you run the CountDistinct function in Realtime Compute V2.X. blink.partialAgg.enabled=true # Enable UNION ALL for optimization. blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true # Enable OBJECT REUSE for optimization. #blink.object.reuse=true # Configure garbage collection for optimization. (You cannot set this parameter when you use a Log Service source table.) blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4' # Set the time zone. blink.job.timeZone=Asia/Shanghai