You can optimize the performance of a Realtime Compute for Apache Flink job by adjusting parameter settings, including job, resource, and upstream and downstream data storage parameters.

Overview

You can configure the following three types of parameters to optimize job performance:
  • Upstream and downstream data storage parameters
  • Job parameters such as miniBatch
  • Resource parameters such as parallelism, core, and heap_memory

This topic describes how to configure the preceding three types of parameters. After you modify or add parameter settings for a job, you must terminate and then start the job or suspend and then resume it to apply the new settings. For more information, see Apply new configuration.

Upstream and downstream data storage parameters

In Realtime Compute for Apache Flink, each data record can trigger read and write operations on the source and result tables. This affects the performance of both upstream and downstream data storage systems. To address this performance issue, you can configure batch size parameters to specify the number of data records that can be read from a source table or written to a result table at a time. The following table lists source and result tables that support batch size parameters.
Table Parameter Description Value
DataHub source table batchReadSize The number of data records that are read at a time. Optional. Default value: 10.
DataHub result table batchSize The number of data records that are written at a time. Optional. Default value: 300.
Log Service source table batchGetSize The number of log groups that are read at a time. Optional. Default value: 10.
AnalyticDB for MySQL V2.0 result table batchSize The number of data records that are written at a time. Optional. Default value: 1000.
ApsaraDB for RDS result table batchSize The number of data records that are written at a time. Optional. Default value: 50.
HybridDB for MySQL result table batchSize The number of data records that are written at a time. Optional. Default value: 1000. Recommended maximum value: 4096.
bufferSize The buffer size after data deduplication. You can use this parameter only when a primary key is defined. Optional. Recommended maximum value: 4096. This parameter is required when the batchSize parameter is configured.
Note To configure the batch data read and write function, you can add the preceding parameters to the WITH clause in a data definition language (DDL) statement for a storage system. For example, add batchReadSize='<number>' to the WITH clause.

Job parameters

The miniBatch parameter can only be used to optimize the GROUP BY operator. If you use Flink SQL to process streaming data, Realtime Compute for Apache Flink reads state data each time a data record arrives. This consumes a large number of I/O resources. If you have configured the miniBatch parameter, Realtime Compute for Apache Flink reads the state data only once for the data records with the same key and generates only the latest data record. This reduces the frequency at which the state data is read and minimizes downstream data updates. Configure the miniBatch parameter based on the following rules:
  • After you add new parameters for a job, terminate and then start the job to apply the new settings.
  • After you modify parameters for a job, suspend and then resume the job to apply the new settings.
# Enable window miniBatch in Realtime Compute for Apache Flink V3.2.0 and later. In Realtime Compute for Apache Flink V3.2.0 and later, Window miniBatch is disabled by default.
sql.exec.mini-batch.window.enabled=true
# Exactly-once semantics.
blink.checkpoint.mode=EXACTLY_ONCE
# The checkpoint interval in milliseconds.
blink.checkpoint.interval.ms=180000
blink.checkpoint.timeout.ms=600000
# Use Niagara as the state backend to configure the lifecycle (in milliseconds) of the state data in Realtime Compute for Apache Flink V2.0.0 and later.
state.backend.type=niagara
state.backend.niagara.ttl.ms=129600000
# Enable micro-batch processing with an interval of 5s in Realtime Compute for Apache Flink V2.0.0 and later. You cannot configure this parameter when you use a window function.
blink.microBatch.allowLatencyMs=5000
# The latency allowed for a job.
blink.miniBatch.allowLatencyMs=5000
# Enable miniBatch for the node that joins two streams.
blink.miniBatch.join.enabled=true
# The size of a batch.
blink.miniBatch.size=20000
# Enable local aggregation. This feature is enabled by default in Realtime Compute for Apache Flink V2.0.0 and later. If you use Realtime Compute for Apache Flink V1.6.4, you must manually enable this feature.
blink.localAgg.enabled=true
# Enable partialAgg to resolve efficiency issues when you run the CountDistinct function in Realtime Compute for Apache Flink V2.0.0 and later.
blink.partialAgg.enabled=true
# Enable UNION ALL for optimization.
blink.forbid.unionall.as.breakpoint.in.subsection.optimization=true
# Configure garbage collection for optimization. You cannot configure this parameter when you use a Log Service source table.
blink.job.option=-yD heartbeat.timeout=180000 -yD env.java.opts='-verbose:gc -XX:NewRatio=3 -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:ParallelGCThreads=4'
# Specify the time zone.
blink.job.timeZone=Asia/Shanghai

Resource parameters

Perform the following steps to optimize resource configurations:
  1. Issue analysis
    1. As shown in the following topology, the percentage of the input queues at task node 2 reaches 100%. The data at task node 2 is stacked up and causes backpressure on task node 1, at which the percentage of the output queues has reached 100%.
    2. Click task node 2.
    3. Click SubTask List and find the subtask in which the value of In Queue is 100%.
    4. Click View Logs in the Actions column.
    5. Click Link to TaskExecutor.
    6. On the TaskExecutor tab, click Metrics Graph to check the CPU and memory usage.
  2. Performance optimization
    1. On the Development page, click Basic Properties on the right. In the Basic Properties pane, click View details on the Configurations page.
    2. On the page that appears, modify the parameters of one or more operators in a group.
      • To modify the parameters of an operator, perform the following steps:
        1. In the GROUP box, click the plus sign (+) in the upper-right corner.
        2. Move the pointer over the target operator box.
        3. Click the Pencil icon icon next to the operator name.
        4. In the Modify Operator Data dialog box, modify parameters as required and click OK.
      • To modify the parameters of multiple operators in a group at a time, perform the following steps:
        1. Move the pointer over the GROUP box.
        2. Click the Pencil icon icon next to GROUP.
        3. In the Modify Operator Data dialog box, modify parameters as required and click OK.
    3. Click Configurations on the right. Move the pointer over Configurations in the upper-right corner and select Apply and Close from the drop-down list.
      • If job performance is not significantly improved after you modify the resource parameters of the group, perform the following steps:
        1. Check whether data skew exists on the operator.
        2. Check whether subtasks of complex operators, such as GROUP BY, WINDOW, and JOIN, are running properly.
      • To remove an operator from a chain, perform the following steps:
        1. Move the pointer over the target operator and click the Edit icon next to the operator name.
        2. In the Modify Operator Data dialog box, set chainingStrategy to HEAD.
          If the chainingStrategy parameter of this operator has been set to HEAD, you must also set the chainingStrategy parameter to HEAD for the next operator. The following table describes valid values of the chainingStrategy parameter.
          Value Description
          ALWAYS Operators are combined to increase parallelism and optimize performance.
          NEVER Operators are not combined with their upstream or downstream operators.
          HEAD Operators are only combined with their downstream operators.
  3. Rules and suggestions
    • We recommend that you set core:heap_memory to 1:4, which indicates that each CPU core is assigned 4 GB of memory. For example:
      • If the core parameter of operators is set to 1 and the heap_memory parameter of operators is set to 3, the system assigns 1 CU with 4 GB of memory to the chain.
      • If the core parameter of operators is set to 1 and the heap_memory parameter of operators is set to 5, the system assigns 1.25 CUs with 5 GB of memory to the chain.
      Note
      • Total number of cores for an operator = Value of parallelism × Value of core.
      • Total heap memory size for an operator = Value of parallelism × Value of heap_memory.
      • The core value for a chain equals the maximum core value among the operators in the chain. The heap memory size for a chain equals the total heap memory size of the operators in the chain.
    • parallelism
      • Source node
        The number of source nodes is a multiple of the number of upstream partitions. For example, if 16 source nodes exist, you can set the parallelism parameter to a divisor of 16, such as 16, 8, or 4.
        Note The value of the parallelism parameter for the source nodes cannot exceed the number of shards for the source nodes.
      • Operator node
        Set the parallelism parameter of the operator nodes based on the estimated queries per second (QPS).
        • If the QPS is low, set the number of operator nodes to the same value as the parallelism value of the source nodes.
        • If the QPS is high, make sure that the number of operator nodes is greater than the parallelism value of the source nodes. For example, if the parallelism value is 16, set the number of operator nodes to a value greater than 16, such as 64, 128, or 256.
      • Sink node
        Set the parallelism parameter of the sink nodes to a value that is two to three times the number of downstream partitions.
        Note Do not set the parallelism parameter of the sink nodes to a value that is greater than three times the number of downstream partitions. Otherwise, write timeout or failures may occur. For example, if there are 16 sink nodes, do not set the parallelism parameter of these sink nodes to a value greater than 48.
    • core

      This parameter specifies the CPU amount. Configure its value based on the actual CPU utilization. Default value: 0.1. Recommended value: 0.25.

    • heap_memory

      This parameter specifies the heap memory size. Configure its value based on actual memory usage. The default value is 256, in MB.

    • state_size
      You must set the state_size parameter to 1 for task nodes with the GROUP BY, undefinedJOIN, OVER, or WINDOW operator so that the system assigns extra memory for the operator to access state data. The default value of the state_size parameter is 0.
      Note If you do not set the state_size parameter to 1, the job may fail.

Apply new configuration

After you configure the parameters, you must suspend and then resume the job or terminate and then start it to make the configuration take effect. The job status is cleared when the job is terminated, which may change the execution result.
Note
  • You can suspend and then resume a job after you change values of the resource parameters, parameters in the WITH clause, or job parameters.
  • You can terminate and then start a job after you modify the SQL logic, change the job version, add parameters to the WITH clause, or add job parameters.

After you restart or resume the job, you can navigate to Administration > Overview > Vertex Topology to check whether the new configuration has taken effect.

  • To suspend and resume a job, perform the following steps:
    1. Publish a job. For more information, see Publish a job. Set Resource Configuration to Use Latest Manually Configured Resources.
    2. On the Administration page, find the target job and click Suspend in the Actions column.
    3. On the Administration page, find the target job and click Resume in the Actions column.
    4. In the Resume dialog box, click Resume with Latest Configuration.Resume a job
  • To terminate and then start a job, perform the following steps:
    1. Terminate a job.
      1. Log on to the Realtime Compute development platform.
      2. In the top navigation bar, click Administration.
      3. On the Administration page, find the target job and click Terminate in the Actions column.
    2. Start the job.
      1. Log on to the Realtime Compute development platform.
      2. In the top navigation bar, click Administration.
      3. On the Administration page, find the target job and click Start in the Actions column.
      4. In the Start dialog box, configure the Start Time for Reading Data parameter.Start
      5. Click OK. The job is started.
        Note The Start Time for Reading Data parameter indicates the point in time to read data from the source table.
        • If you specify the current time, Realtime Compute for Apache Flink reads data generated after the current time.
        • If you specify a previous point in time, Realtime Compute for Apache Flink reads data generated after the specified time. This is used to track historical data.

Parameters

  • Global

    isChainingEnabled: specifies whether chaining is enabled. Default value: true. Use the default value.

  • Nodes
    Parameter Description Allow modification
    id The unique ID of the node. The node ID is generated by the system. No
    uid The UID of the node. The UID is used to generate the operator ID. If you do not specify this parameter, the UID is the same as the node ID. No
    pact The type of the node, for example, source node, operator node, or sink node. No
    name The name of the node. You can customize this parameter. Yes
    slotSharingGroup Specifies whether subtasks can share the same slot. Use the default value for this parameter. No
    chainingStrategy Defines the operator chaining strategy. If an operator is combined with an upstream operator, they run in the same thread. They are combined into an operator chain with multiple running steps. Valid values:
    • ALWAYS: Operators are combined to increase parallelism and optimize performance.
    • NEVER: Operators are not combined with their upstream or downstream operators.
    • HEAD: Operators are only combined with their downstream operators.
    Yes
    parallelism The parallelism on the node. Default value: 1. You can increase this value as required. Yes
    core The CPU amount. Default value: 0.1. Set this parameter based on actual CPU utilization. Recommended value: 0.25. Yes
    heap_memory The heap memory size, in MB. The default value is 256. Set this parameter based on actual memory usage. Yes
    direct_memory The off-heap memory of a Java Virtual Machine (JVM), in MB. Default value: 0. The value of this parameter can be changed, but we recommend that you use its default value.
    native_memory The JVM off-heap memory that is used for the Java Native Interface (JNI), in MB. Default value: 0. You can change the value to 10 for the state backend. The value of this parameter can be changed, but we recommend that you use its default value.
  • Chain
    A Flink SQL task is a directed acyclic graph (DAG) that contains many nodes (operators). Upstream and downstream operators can be combined to form an operator chain when they are running. The CPU amount for a chain equals the maximum CPU amount among the operators in the chain, and the memory size for the chain equals the total memory size of the operators in the chain. An operator chain can significantly reduce data transmission costs.
    Note
    • Only operators with the same parallelism value can be combined to form a chain.
    • You cannot add a GROUP BY operator to a chain.