All Products
Search
Document Center

Container Service for Kubernetes:Configure dynamic resource allocation for Spark jobs

Last Updated:Mar 24, 2025

This topic describes how to configure and use the dynamic resource allocation feature of Spark to maximize the efficiency of cluster resources utilization, reduce resource idleness, and improve task execution flexibility and overall system performance.

What is dynamic resource allocation?

Dynamic resource allocation is a mechanism provided by Spark that dynamically adjusts computing resources used by jobs based on the workload size. If an executor is idle for a long period of time, the driver automatically releases the executor and returns resources to the cluster. If specific jobs are scheduled for a long period of time, the driver requests additional executors to run the jobs. The dynamic resource allocation feature helps Spark to flexibly respond to workload changes and avoid excessive job execution time due to insufficient resources or resource waste due to excessive resources. This improves the overall resource utilization of the cluster.

Resource allocation policy

If you enable the dynamic resource allocation feature and a job is waiting to be scheduled, the driver requests an additional executor. If the job wait time exceeds the value of the spark.dynamicAllocation.schedulerBacklogTimeout parameter, the driver requests executors in batches. The default value of this parameter is 1 second. The default value of the spark.dynamicAllocation.sustainedSchedulerBacklogTimeout parameter is 1 second. If remaining tasks need to be scheduled, the driver continuously request additional executors every second. The number of executors requested in each batch exponentially increases. For example, the first batch requests one executor, the second batch requests two executors, the third batch requests four executors, and the fourth batch requests eight executors. Subsequent batches request executors in the same pattern.

Resource release policy

For idle executors, the driver automatically releases executors when the idle time exceeds the value of the spark.dynamicAllocation.executorIdleTimeout parameter to optimize resource utilization. The default value of this parameter is 60 seconds.

Enable dynamic resource allocation

The dynamic resource allocation feature of Spark can be used in Standalone, YARN, Mesos, and Kubernetes running modes. By default, the feature is disabled. To enable the dynamic resource allocation feature, set the spark.dynamicAllocation.enabled parameter to true and perform the following configurations:

  1. Enable External Shuffle Service (ESS): Set spark.shuffle.service.enabled to true and configure ESS on each worker node in the same cluster.

  2. Enable shuffled data tracking: If you set spark.dynamicAllocation.shuffleTracking.enabled to true, Spark tracks the location and status of the shuffled data to ensure that the data is not lost and can be recalculated based on your business requirements.

  3. Enable node decommissioning: Set spark.decommission.enabled and spark.storage.decommission.shuffleBlocks.enabled to true to ensure that Spark actively copies the shuffled data blocks on decommissioned nodes to other available nodes.

  4. Configure the ShuffleDataIO plug-in: Configure the spark.shuffle.sort.io.plugin.class parameter to specify the ShuffleDataIO plug-in class. You can also specify custom the I/O operations for shuffled data to write the data to different storage systems.

Configuration method varies in different modes.

  • Spark applications in standalone mode: You need to only enable ESS.

  • Spark on Kubernetes:

    • If you do not use Celeborn as Remote Shuffle Service (RSS), set spark.dynamicAllocation.shuffleTracking.enabled to true to enable shuffled data tracking.

    • Use Celeborn as RSS:

      • If the Spark version is 3.5.0 or later, set spark.shuffle.sort.io.plugin.class to org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO to configure the ShuffleDataIO plug-in.

      • If the Spark version is earlier than 3.5.0, you must patch the Spark version. No additional configuration is required. For more information, see Support Spark Dynamic Allocation.

      • If the Spark version is 3.4.0 or later, we recommend that you set the spark.dynamicAllocation.shuffleTracking.enabled to false to prevent the executor from being released when it is idle.

This example describes how to configure RSS in Spark in Kubernetes mode, including scenarios in which you do not use Celeborn as RSS.

Prerequisites

Procedure

Do not use Celeborn as RSS

In this example, Spark 3.5.4 is used. If you do not use Celeborn as RSS and enable dynamic resource allocation, configure the following parameters:

  • Enable dynamic resource allocation: Set spark.dynamicAllocation.enabled to "true".

  • Enable shuffled data tracking and implement dynamic resource allocation without relying on ESS: spark.dynamicAllocation.shuffleTracking.enabled: "true".

  1. Create a file named spark-pagerank-dra.yaml and copy the following content to the file. The file is used to create a SparkApplication.

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-dra
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      # Replace <SPARK_IMAGE> with your Spark image. 
      image: <SPARK_IMAGE>
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # Replace <OSS_BUCKET> with the name of your OSS bucket. 
      - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
      - "10"
      sparkVersion: 3.5.4
      hadoopConf:
        # Use the oss:// format to access OSS data. 
        fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
        fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
        # The endpoint to access OSS. You must replace <OSS_ENDPOINT> with your OSS access endpoint. 
        # For example, the internal endpoint to access OSS in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com
        fs.oss.endpoint: <OSS_ENDPOINT>
        # Obtain OSS access credentials from environment variables. 
        fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
      sparkConf:
        # ====================
        # Event logs
        # ====================
        spark.eventLog.enabled: "true"
        spark.eventLog.dir: file:///mnt/nas/spark/event-logs
    
        # ====================
        # Dynamic resource allocation
        # ====================
        # Enable dynamic resource allocation
        spark.dynamicAllocation.enabled: "true"
        # Enable shuffled file tracing to implement dynamic resource allocation without relying on ESS. 
        spark.dynamicAllocation.shuffleTracking.enabled: "true"
        # The initial value of the number of executors. 
        spark.dynamicAllocation.initialExecutors: "1"
        # The minimum number of executors. 
        spark.dynamicAllocation.minExecutors: "0"
        # The maximum number of executors. 
        spark.dynamicAllocation.maxExecutors: "5"
        # The idle timeout period of the executor. If the timeout period is exceeded, the executor is released. 
        spark.dynamicAllocation.executorIdleTimeout: 60s
        # The idle timeout period of the executor that cached the data block. If the timeout period is exceeded, the executor is released. The default value is infinity, which specifies that the data block is not released. 
        # spark.dynamicAllocation.cachedExecutorIdleTimeout:
        # When a job exceeds the specified scheduled time, additional executors are requested. 
        spark.dynamicAllocation.schedulerBacklogTimeout: 1s
        # After each time interval, subsequent batches of executors are requested.  
        spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        envFrom:
        - secretRef:
            name: spark-oss-secret
        volumeMounts:
        - name: nas
          mountPath: /mnt/nas
        serviceAccount: spark-operator-spark
      executor:
        cores: 1
        coreLimit: "2"
        memory: 8g
        envFrom:
        - secretRef:
            name: spark-oss-secret
        volumeMounts:
        - name: nas
          mountPath: /mnt/nas
      volumes:
      - name: nas
        persistentVolumeClaim:
          claimName: nas-pvc
      restartPolicy:
        type: Never
    Note

    The Spark image used in the preceding sample job must contain Hadoop OSS SDK dependencies. To build an image, use the following Dockerfile and push the image to your image repository:

    ARG SPARK_IMAGE=spark:3.5.4
    
    FROM ${SPARK_IMAGE}
    
    # Add dependency for Hadoop Aliyun OSS support
    ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
  2. Run the following command to submit the Spark job:

    kubectl apply -f spark-pagerank-dra.yaml 

    Expected output:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-dra created
  3. View driver logs.

    kubectl logs -n spark spark-pagerank-dra-driver | grep -a2 -b2 "Going to request"

    Expected output:

    3544-25/01/16 03:26:04 INFO SparkKubernetesClientFactory: Auto-configuring K8s client using current context from users K8s config file
    3674-25/01/16 03:26:06 INFO Utils: Using initial executors = 1, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
    3848:25/01/16 03:26:06 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, sharedSlotFromPendingPods: 2147483647.
    4026-25/01/16 03:26:06 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs
    4106-25/01/16 03:26:06 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
    --
    10410-25/01/16 03:26:15 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.95.190, executor 1, partition 0, PROCESS_LOCAL, 9807 bytes)
    10558-25/01/16 03:26:15 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.95.190:34327 (size: 12.5 KiB, free: 4.6 GiB)
    10690:25/01/16 03:26:16 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2, known: 1, sharedSlotFromPendingPods: 2147483647.
    10868-25/01/16 03:26:16 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2 for resource profile id: 0)
    11030-25/01/16 03:26:16 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs

    The logs show that the driver requests an executor at 03:26:06 and 03:26:16.

  4. Use Spark History Server to view logs. For more information, see Use Spark History Server to view information about Spark jobs.

    image

    The event timeline of the job shows that two executors are sequentially created.

Use Celeborn as RSS

In this example, Spark 3.5.4 is used. If you use Celeborn as RSS and enable dynamic resource allocation, configure the following parameters:

  • Enable dynamic resource allocation: Set spark.dynamicAllocation.enabled to "true".

  • For Spark 3.5.0 and later, set spark.shuffle.sort.io.plugin.class to org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO.

  • Ensure that idle executors are released at the earliest opportunity: Set spark.dynamicAllocation.shuffleTracking.enabled to "false".

  1. Create a file named spark-pagerank-celeborn-dra.yaml and copy the following content to the file. The file is used to create a SparkApplication.

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pagerank-celeborn-dra
      namespace: spark
    spec:
      type: Scala
      mode: cluster
      # Replace <SPARK_IMAGE> with your Spark image. The image must contain JindoSDK. 
      image: <SPARK_IMAGE>
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      mainClass: org.apache.spark.examples.SparkPageRank
      arguments:
      # Replace <OSS_BUCKET> with the name of your OSS bucket. 
      - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
      - "10"
      sparkVersion: 3.5.4
      hadoopConf:
        # Use the oss:// format to access OSS data. 
        fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
        fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
        # The endpoint to access OSS. You must replace <OSS_ENDPOINT> with your OSS access endpoint.
        # For example, the internal endpoint to access OSS in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com
        fs.oss.endpoint: <OSS_ENDPOINT>
        # Obtain OSS access credentials from environment variables.
        fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
      sparkConf:
        # ====================
        # Event logs
        # ====================
        spark.eventLog.enabled: "true"
        spark.eventLog.dir: file:///mnt/nas/spark/event-logs
    
        # ====================
        # Celeborn
        # Ref: https://github.com/apache/celeborn/blob/main/README.md#spark-configuration
        # ====================
        # Shuffle manager class name changed in 0.3.0:
        #    before 0.3.0: `org.apache.spark.shuffle.celeborn.RssShuffleManager`
        #    since 0.3.0: `org.apache.spark.shuffle.celeborn.SparkShuffleManager`
        spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
        # Must use kryo serializer because java serializer do not support relocation
        spark.serializer: org.apache.spark.serializer.KryoSerializer
        # Configure this parameter based on the number of replicas on the Celeborn master node. 
        spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
        # options: hash, sort
        # Hash shuffle writer use (partition count) * (celeborn.push.buffer.max.size) * (spark.executor.cores) memory.
        # Sort shuffle writer uses less memory than hash shuffle writer, if your shuffle partition count is large, try to use sort hash writer.
        spark.celeborn.client.spark.shuffle.writer: hash
        # We recommend setting `spark.celeborn.client.push.replicate.enabled` to true to enable server-side data replication
        # If you have only one worker, this setting must be false 
        # If your Celeborn is using HDFS, it's recommended to set this setting to false
        spark.celeborn.client.push.replicate.enabled: "false"
        # Support for Spark AQE only tested under Spark 3
        spark.sql.adaptive.localShuffleReader.enabled: "false"
        # we recommend enabling aqe support to gain better performance
        spark.sql.adaptive.enabled: "true"
        spark.sql.adaptive.skewJoin.enabled: "true"
        # If the Spark version is 3.5.0 or later, configure this parameter to support dynamic resource allocation.
        spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
        spark.executor.userClassPathFirst: "false"
    
        # ====================
        # Dynamic resource allocation
        # Ref: https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
        # ====================
        # Enable dynamic resource allocation
        spark.dynamicAllocation.enabled: "true"
        # Enable shuffled file tracing and implement dynamic resource allocation without relying on ESS. 
        # If the Spark version is 3.4.0 or later, we recommend that you disable this feature when you use Celeborn as RSS. 
        spark.dynamicAllocation.shuffleTracking.enabled: "false"
        # The initial value of the number of executors. 
        spark.dynamicAllocation.initialExecutors: "1"
        # The minimum number of executors. 
        spark.dynamicAllocation.minExecutors: "0"
        # The maximum number of executors. 
        spark.dynamicAllocation.maxExecutors: "5"
        # The idle timeout period of the executor. If the timeout period is exceeded, the executor is released. 
        spark.dynamicAllocation.executorIdleTimeout: 60s
        # The idle timeout period of the executor that cached the data block. If the timeout period is exceeded, the executor is released. The default value is infinity, which specifies that the data block is not released. 
        # spark.dynamicAllocation.cachedExecutorIdleTimeout:
        # When a job exceeds the specified scheduled time, additional executors are requested. 
        spark.dynamicAllocation.schedulerBacklogTimeout: 1s
        # After each time interval, subsequent batches of executors are requested.  
        spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        envFrom:
        - secretRef:
            name: spark-oss-secret
        volumeMounts:
        - name: nas
          mountPath: /mnt/nas
        serviceAccount: spark-operator-spark
      executor:
        cores: 1
        coreLimit: "1"
        memory: 4g
        envFrom:
        - secretRef:
            name: spark-oss-secret
        volumeMounts:
        - name: nas
          mountPath: /mnt/nas
      volumes:
      - name: nas
        persistentVolumeClaim:
          claimName: nas-pvc
      restartPolicy:
        type: Never
    Note

    The Spark image used in the preceding sample job must contain Hadoop OSS SDK dependencies. To build an image, use the following Dockerfile and push the image to your image repository:

    ARG SPARK_IMAGE=spark:3.5.4
    
    FROM ${SPARK_IMAGE}
    
    # Add dependency for Hadoop Aliyun OSS support
    ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
    ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
    
    # Add dependency for Celeborn
    ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.3/celeborn-client-spark-3-shaded_2.12-0.5.3.jar ${SPARK_HOME}/jars

  2. Run the following command to submit the SparkApplication:

    kubectl apply -f spark-pagerank-celeborn-dra.yaml

    Expected output:

    sparkapplication.sparkoperator.k8s.io/spark-pagerank-celeborn-dra created
  3. View Driver logs.

    kubectl logs -n spark spark-pagerank-celeborn-dra-driver | grep -a2 -b2 "Going to request"

    Expected output:

    3544-25/01/16 03:51:28 INFO SparkKubernetesClientFactory: Auto-configuring K8s client using current context from users K8s config file
    3674-25/01/16 03:51:30 INFO Utils: Using initial executors = 1, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances
    3848:25/01/16 03:51:30 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, sharedSlotFromPendingPods: 2147483647.
    4026-25/01/16 03:51:30 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs
    4106-25/01/16 03:51:30 INFO CelebornShuffleDataIO: Loading CelebornShuffleDataIO
    --
    11796-25/01/16 03:51:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.95.163, executor 1, partition 0, PROCESS_LOCAL, 9807 bytes)
    11944-25/01/16 03:51:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.95.163:37665 (size: 13.3 KiB, free: 2.1 GiB)
    12076:25/01/16 03:51:42 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2, known: 1, sharedSlotFromPendingPods: 2147483647.
    12254-25/01/16 03:51:42 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2 for resource profile id: 0)
    12416-25/01/16 03:51:42 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs

    The command output indicates that the driver requests one executor resource at 03:51:30 and another executor resource at 03:51:42.

  4. Use Spark History Server to view logs. For more information, see Use Spark History Server to view information about Spark jobs.

    image

    The event timeline of the job shows that two executors are sequentially created.