All Products
Search
Document Center

Realtime Compute for Apache Flink:Configure quick task restart

Last Updated:Aug 04, 2023

This topic describes how to configure quick task restart. This feature helps reduce the impact on a deployment if a failover occurs.

Background Information

Important

This feature is in invitational preview. Exercise caution when you use this feature in the production environment. If you encounter any issue, submit a ticket for technical support.

In most cases, if a task in a Flink streaming deployment fails, all tasks in the same pipeline region perform a failover to ensure data consistency. After a deployment performs a failover, the source node needs to start consuming data from the previous checkpoint. However, after the failover for the tasks of specific deployments is complete, you need to download large source files or state data. If the parallelism of deployments is high, the tasks of the deployments may take a long period of time to perform a failover. As a result, the deployments may be delayed or blocked and cannot consume data within a specific period of time. The deployments may take a long period of time to recover.

To resolve this issue, you can configure quick task restart. After you configure quick task restart, if a task fails, only the failed task is restarted. This prevents the source from re-consuming the data of the previous checkpoint due to the exception of a task on a non-source node. This also reduces the period during which deployments cannot consume data due to task cancellation, restart, or data tracking. Quick task restart also alleviates the pressure on the cluster that is caused by the scheduling time and initialization of tasks when the parallelism of deployments is high. This reduces the impact of failovers on deployments.

The quick task restart feature supports two data consistency semantics: APPROXIMATE and AT_LEAST_ONCE. The APPROXIMATE semantics does not ensure data integrity and deduplication. The AT_LEAST_ONCE semantics ensures data integrity but does not ensure data deduplication. The APPROXIMATE semantics does not incur performance overheads. The AT_LEAST_ONCE semantics increases performance overheads and requires Object Storage Service (OSS) for data storage.

Limits

  • You cannot configure quick task restart for streaming deployments that use finite data sources.

  • You cannot use quick task configuration together with unaligned checkpoints.

  • You cannot configure quick task restart for batch deployments.

  • For deployments that are deployed in the same session cluster, you cannot run the deployments for which quick task restart is configured and the deployments for which quick task restart is not configured at the same time.

  • If an operator in a deployment uses the prepareSnapshotPreBarrier method or checkpoint-related information is sent when a deployment runs, you cannot use the AT_LEAST_ONCE semantics.

Precautions

Semantics

Precautions

APPROXIMATE

  • If a task performs a failover, the upstream task cannot continue to send data to the task. As a result, the data causes backpressure. In most cases, the backpressure occurs and the value of the numRecordsInPerSecond metric is reset to 0 during the failover period. After the failover is complete for a period of time, the backpressure disappears and the value of the numRecordsInPerSecond metric becomes normal.

    If the operators of a deployment use only the Rebalance or Rescale partitioning strategy and each partitioner needs to send downstream data to multiple parallel tasks, you can use the dynamic rebalance feature to allow the partitioners to send the data to only the tasks for which no error occurs. This ensures normal data processing during the failover period.

  • If a task exception occurs due to an abnormal exit of a TaskManager or a network exception, the failover period of the deployment may not be significantly reduced after you configure quick task restart. In extreme cases, the failover period may be longer.

  • If the checkpointing fails when a task performs a failover, we recommend that you increase the maximum number of checkpoint failures that are allowed.

  • After you configure quick task restart, data may be lost or duplicated if a deployment performs a failover. Before you configure quick task restart, make sure that data loss or duplicate data does not affect your business.

  • After you enable quick task restart, you can ignore the setting of Checkpointing Mode that is configured for checkpoints on the Flink web user interface (UI). On the Flink web UI, the value of Checkpointing Mode is always displayed as AT_LEAST_ONCE, but the actual value is APPROXIMATE. The APPROXIMATE semantics does not ensure data integrity and deduplication.

AT_LEAST_ONCE

  • If you use the AT_LEAST_ONCE semantics, the performance of the deployment may be degraded and data processing may be delayed. We recommend that you focus on deployment performance and latency. If the preceding issues occur, we recommend that you scale out resources based on your business requirements.

  • If a task performs a failover or data is recovered after a failover is complete when the AT_LEAST_ONCE semantics is used, the upstream task cannot continue to send data to the task. As a result, the data causes backpressure. In most cases, the backpressure occurs and the value of the numRecordsInPerSecond metric is reset to 0 during the failover period. After the failover is complete for a period of time, the backpressure disappears and the value of the numRecordsInPerSecond metric becomes normal.

    If the operators of a deployment use only the Rebalance or Rescale partitioning strategy and each partitioner needs to send downstream data to multiple parallel tasks, you can use the dynamic rebalance feature to allow the partitioners to send the data to only the tasks for which no error occurs. This ensures normal data processing during the failover period.

  • If a task exception occurs due to an abnormal exit of a TaskManager or a network exception, the failover period of the deployment may not be significantly reduced after you configure quick task restart. In extreme cases, the failover period may be longer.

  • If the checkpointing fails when a task performs a failover, we recommend that you increase the maximum number of checkpoint failures that are allowed.

  • After you use the AT_LEAST_ONCE semantics, each task of a deployment frequently refreshes its state and records the output data. This increases the I/O throughput of reading data from and writing data to OSS. We recommend that you do not use the AT_LEAST_ONCE semantics for a deployment for which a large amount of state data is generated or a deployment in which a large amount of data is processed.

    Note
    • You can view the bandwidth and queries per second (QPS) that are used by OSS in the E-MapReduce (EMR) console.

    • The total I/O throughput of a deployment can be estimated based on the following formula: Total number of bytes received by each operator/Duration for which the deployment runs.

  • If you use the AT_LEAST_ONCE semantics in a deployment, the I/O throughput for the deployment to read data from and write data to OSS increases. In this case, the deployments whose data is stored in an OSS bucket within the same Alibaba Cloud account to which the current deployment belongs may be affected. If the deployments whose data is stored in an OSS bucket within the same Alibaba Cloud account have a high I/O throughput, the deployment for which you use the AT_LEAST_ONCE semantics may also be affected. To prevent this issue, we recommend that you focus on the deployments whose data is stored in an OSS bucket within the same Alibaba Cloud account and the I/O throughput and storage capacity that are used for all deployments to read data from and write data to OSS.

  • In specific cases, if a deployment is abnormal, a failover occurs. For example, when the first checkpointing is complete, a failover occurs due to the exception in the deployment.

  • The checkpoint IDs for a deployment that uses the AT_LEAST_ONCE semantics cannot be consecutive values.

  • The number of deployment failovers is separately incremented. If a global exception occurs and causes a large number of tasks to perform a failover, the number of deployment failovers increases based on the number of tasks that perform a failover.

Procedure

  1. Go to the entry point to configure quick task restart.

    1. Log on to the Realtime Compute for Apache Flink console.

    2. On the Fully Managed Flink tab, find the workspace that you want to manage and click Console in the Actions column.

    3. In the left-side navigation pane, click Deployments. On the Deployments page, click the name of the desired deployment.

    4. In the upper-right corner of the Parameters section on the Configuration tab, click Edit.

  2. In the Other Configuration field, add the following code.

    • Code that is required to be added for the APPROXIMATE semantics:

      individual-task-failover.enabled: enabled_approximate
      shuffle-service-factory.class: org.apache.flink.runtime.io.network.IndividualRecoverableNettyShuffleServiceFactory
    • Code that is required to be added for the AT_LEAST_ONCE semantics:

      individual-task-failover.enabled: enabled
      shuffle-service-factory.class: org.apache.flink.runtime.io.network.IndividualRecoverableNettyShuffleServiceFactory
      individual-task-failover.intermediate-checkpointing.interval: the period of time in which a task of a deployment refreshes its state. We recommend that you set this parameter to a value ranging from one fifth to one tenth of the checkpointing period. Unit: milliseconds. 
      classloader.check-leaked-classloader: false

    If your deployment is deployed in a session cluster, you must also add the preceding code to the configuration of the session cluster. For more information about how to configure a deployment in a session cluster, see Step 1: Create a session cluster.

  3. In the upper-right corner of the Parameters section, click Save.

  4. In the upper part of the Deployments page, click Cancel.

  5. On the Deployments page, find the deployment that you want to start and click Start in the Actions column.