You can optimize the performance of a Realtime Compute for Apache Flink job by adjusting the settings of job, resource, and upstream and downstream data storage parameters.
Overview
- Upstream and downstream data storage parameters
- Job parameters, such as miniBatch
- Resource parameters, such as parallelism, core, and heap_memory
This topic describes how to configure the preceding three types of parameters. After you reconfigure parameters for a job, you must terminate and then start the job, or the job to apply new settings. For more information, see Apply new configurations.
Upstream and downstream data storage parameters
Table | Parameter | Description | Value |
---|---|---|---|
DataHub source table | batchReadSize | The number of data records that are read at a time. | Optional. Default value: 10. |
DataHub result table | batchSize | The number of data records that are written at a time. | Optional. Default value: 300. |
Log Service source table | batchGetSize | The number of log groups that are read at a time. | Optional. Default value: 10. |
AnalyticDB for MySQL V2.0 result table | batchSize | The number of data records that are written at a time. | Optional. Default value: 1000. |
ApsaraDB for RDS result table | batchSize | The number of data records that are written at a time. | Optional. Default value: 4096. |
HybridDB for MySQL result table | batchSize | The number of data records that are written at a time. | Optional. Default value: 1000. The recommended maximum value is 4096. |
bufferSize | The buffer size after duplicates are removed. You can use this parameter only after a primary key is defined. | Optional. This parameter is required when the batchSize parameter is configured. The recommended value for the bufferSize parameter is 4096. |
batchReadSize='<number>'
to the WITH clause.
Job parameters
- After you add parameters for a job, terminate and then start the job to apply the new settings.
- After you change parameter settings for a job, suspend and then resume the job to apply the new settings.
# Enable window miniBatch in Realtime Compute for Apache Flink V3.2 and later. By default, window miniBatch is disabled in Realtime Compute for Apache Flink V3.2 and later.
sql.exec.mini-batch.window.enabled=true
# Exactly-once semantics.
blink.checkpoint.mode=EXACTLY_ONCE
# The checkpoint interval. Unit: milliseconds.
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# Use Niagara as the state backend to configure the lifecycle of the state backend in Realtime Compute for Apache Flink V2.0 and later. The lifecycle unit is milliseconds.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# In Realtime Compute for Apache Flink V2.0 and later, enable micro-batch processing that is performed at an interval of 5 seconds. You cannot configure this parameter when you use a window function.
blink.microBatch.allowLatencyMs=5000
# The latency that is allowed for a job.
blink.miniBatch.allowLatencyMs=5000
# Enable miniBatch for the node that joins two streams.
blink.miniBatch.join.enabled=true
# The size of a batch.
blink.miniBatch.size=20000
# Enable local aggregation. By default, this feature is enabled in Realtime Compute for Apache Flink V2.0 and later. You must manually enable Realtime Compute for Apache Flink V1.6.4.
blink.localAgg.enabled=true
# Enable partial aggregation to improve efficiency when you run the CountDistinct function in Realtime Compute for Apache Flink V2.0 and later.
blink.partialAgg.enabled=true
# Enable UNION ALL for optimization.
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# Configure garbage collection for optimization. You cannot configure this parameter when you use a Log Service source table.
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# Specify the time zone.
blink.job.timeZone=Asia/Shanghai
Resource parameters
- Issue analysis
- In the following topology, the percentage of the input queues at Task node 2 reaches
100%. The data at Task node 2 is stacked up and causes back pressure on Task node
1. At Task node 1, the percentage of the output queues has reached 100%.
- Click Task node 2.
- Click SubTask List and find the subtask in which the value of In Queue is
100%
. - Click View Logs in the Actions column.
- Click Link to TaskExecutor.
- On the <uicontrol>TaskExecutor</uicontrol> tab, click <uicontrol>Metrics Graph<uicontrol> to check the CPU utilization and the memory usage.
- In the following topology, the percentage of the input queues at Task node 2 reaches
100%. The data at Task node 2 is stacked up and causes back pressure on Task node
1. At Task node 1, the percentage of the output queues has reached 100%.
- Performance optimization
- On the right side of the job editing page, click Configurations to view the details about resource configurations.
- In the window that appears, change the parameter values of one or more operators in
a group.
- To change the parameter values of one operator, perform the following steps:
- In the GROUP box, click the plus sign (+) in the upper-right corner.
- Move the pointer over the operator box.
- Click the
icon next to the operator name.
- In the Modify Operator Data dialog box, change parameter values and click OK.
- To change the parameter values of multiple operators in a group at a time, perform
the following steps:
- Move the pointer over the GROUP box.
- Click the
icon next to Group.
- In the Modify Operator Data dialog box, change the parameter values based on your business requirements and click OK.
- To change the parameter values of one operator, perform the following steps:
- Then, in the upper-right corner of the Configurations page, choose .
- If the job performance is not significantly improved after you change the values of
the resource parameters for the group, perform the following steps to troubleshoot
the issue:
- Check whether data skew exists on the operator.
- Check whether subtasks of complex operators, such as GROUP BY, WINDOW, and JOIN, are running in a proper way.
- To remove an operator from a chain, perform the following steps:
- Click the operator that you want to remove.
- In the Modify Operator Data dialog box, set the chainingStrategy parameter to
HEAD
.If the chainingStrategy parameter of this operator is set toHEAD
, you must also set the chainingStrategy parameter toHEAD
for the next operator. The following table lists the valid values of the chainingStrategy parameter.Value Description ALWAYS Operators are combined to increase parallelism and optimize performance. NEVER Operators are not combined with the upstream and downstream operators. HEAD Operators are combined with only the downstream operators.
- If the job performance is not significantly improved after you change the values of
the resource parameters for the group, perform the following steps to troubleshoot
the issue:
- Rules and suggestions
- We recommend that you set
core:heap_memory
to 1:4. This indicates that each CPU core is assigned with 4 GB memory. The following example is used to describe the rules and suggestions.- If the core parameter of operators is set to 1 and the heap_memory parameter of the operator is set to 3, the system assigns 1 Compute Unit (CU) and 4 GB memory to the chain.
- If the core parameter of operators is set to 1 and the heap_memory parameter of operators is set to 5, the system assigns 1.25 CUs and 5 GB memory to the chain.
Note- The total number of cores for an operator is calculated based on the following formula: Value of the parallelism parameter × Value of the core parameter.
- The total heap_memory size for an operator is calculated based on the following formula: Value of the parallelism parameter × Value of the heap_memory parameter.
- The core value for a chain is the maximum core value among the operators in the group. The heap_memory size for a chain is the total heap_memory size of all the operators in the chain.
- parallelism
- Source node
The number of source nodes is a multiple of the number of upstream partitions. For example, if the number of source nodes is 16, you must set the parallelism value to a divisor of 16, such as 16, 8, or 4. The divisor must exclude 16.Note The value of the parallelism parameter for the source nodes cannot exceed the number of shards for the source nodes.
- Operator node
Specify the parallelism parameter of the operator nodes based on the estimated queries per second (QPS).
- If the QPS is low, you can set the number of operator nodes to the value that is the same as the parallelism value of the source nodes.
- If the QPS is high, make sure that the number of operator nodes is greater than the parallelism value of the source nodes. For example, if the parallelism value is 16, set the number of operator nodes to a value that is greater than 16, such as 64, 128, or 256.
- Sink node
Set the parallelism of the sink nodes to a value that is two to three times the number of downstream partitions.Note Do not set the parallelism parameter of the sink nodes to a value that is greater than three times the number of downstream partitions. Otherwise, write time-out or failures may occur. For example, if the number of sink nodes is 16, do not set the parallelism parameter of these sink nodes to a value that is greater than 48.
- Source node
- core
This parameter specifies the number of CPU cores. You can specify this parameter based on the actual CPU utilization. The recommended value of this parameter is 0.25. The default value is 0.1.
- heap_memory
The heap memory size. You can configure the value based on the actual memory usage. The default value is 256. The unit is MB.
- state_sizeYou must set the state_size parameter to
1
for task nodes where the GROUP BY, undefinedJOIN, OVER, or WINDOW operators are used. This way, the system assigns extra memory for the operator to access state data. The default value of the state_size parameter is 0.Note If you do not set state_size to1
, the job may fail.
- We recommend that you set
Apply new configurations
- You can suspend and then resume a job after you change values of the resource parameters, parameters in the WITH clause, or job parameters.
- You can terminate and then start a job after you modify the SQL logic, change the job version, add parameters to the WITH clause, or add job parameters.
After you restart or resume the job, you can navigate to
. Then, check whether the new configurations have taken effect.- To suspend and resume a job, perform the following steps:
- Publish a job. For more information, see Publish a job. Set Resource Configuration to Use Latest Manually Configured Resources.
- On the Administration page, find the job, and click Suspend in the Actions column.
- On the Administration page, find the job, and click Resume in the Actions column.
- In the Resume dialog box, click Resume with Latest Configuration.
- To terminate and then start a job, perform the following steps:
- Terminate a job.
- Log on to the Realtime Compute development platform.
- In the top navigation bar, click Administration.
- On the Administration page, find the job, and click Terminate in the Actions column.
- Start the job.
- Log on to the Realtime Compute development platform.
- In the top navigation bar, click Administration.
- On the Administration page, find the job, and click Start in the Actions column.
- In the Start dialog box, configure the Start Time for Reading Data parameter.
- Click OK. The job is started.
Note Start Time for Reading Data specifies the time when Realtime for Apache Flink starts to read data from the source table. The time indicates the time when data is generated.
- If you specify the current time, Realtime Compute for Apache Flink reads data that is generated from the current time.
- If you select a previous time point, Realtime Compute for Apache Flink reads data that is generated from this time point. This is used to trace historical data.
- Terminate a job.
Parameters
- Global
isChainingEnabled specifies whether chaining is enabled. The default value is true. Use the default value for this parameter.
- Nodes
Parameter Description Allow modification id The unique ID of the node. The node ID is generated by the system. No uid The unique user identifier (UID) of the node. The UID is used to generate the operator ID. If you do not specify this parameter, the UID is the same as the node ID. No pact The node type, such as data source, operator, or data sink. No name The name of the node. You can customize this parameter. Yes slotSharingGroup Specifies whether subtasks can share the same slot. Use the default value for this parameter. No chainingStrategy Defines the operator chaining strategy. If an operator is combined with an upstream operator, they run in the same thread. They are combined into an operator chain that has multiple running steps. The following items describe the three values for this parameter: - ALWAYS: Operators are combined to increase parallelism and optimize performance.
- NEVER: Operators are not combined with the related upstream or downstream operators.
- HEAD: Operators are combined with only the downstream operators.
Yes parallelism The number of concurrent jobs on the node. You can increase the value based on your requirements. Default value: 1. Yes core The number of CPU cores. You can specify the parameter based on the actual CPU utilization. Default value: 0.1. Recommended value: 0.25. Yes heap_memory The heap memory size. You can specify this parameter based on the actual memory usage. Default value: 256 MB. Yes direct_memory The non-heap memory of a Java Virtual Machine (JVM). Unit: MB. Default value: 0. You can change the value of this parameter, but we recommend that you use the default value. native_memory The JVM non-heap memory that is used for the Java Native Interface (JNI). The recommended value is 10. Unit: MB. Default value: 0. You can change the value of this parameter, but we recommend that you use the default value. - Chain
A Flink SQL task is a directed acyclic graph (DAG) that contains multiple nodes or operators. Some upstream and downstream operators can be combined into a new operator when the operators run in the same thread. This process is known as a chain. As a result, the total number of CPUs for the new operator is the maximum number of CPUs among all the operators in the chain. The memory size for the chain equals the total memory size of all the operators in the chain. An operator chain can significantly reduce data transmission costs.Note
- Only operators that have the same parallelism value can be combined to form a chain.
- You cannot add a GROUP BY operator to a chain.