This topic describes how to configure and use the dynamic resource allocation feature of Spark to maximize the efficiency of cluster resources utilization, reduce resource idleness, and improve task execution flexibility and overall system performance.
What is dynamic resource allocation?
Dynamic resource allocation is a mechanism provided by Spark that dynamically adjusts computing resources used by jobs based on the workload size. If an executor is idle for a long period of time, the driver automatically releases the executor and returns resources to the cluster. If specific jobs are scheduled for a long period of time, the driver requests additional executors to run the jobs. The dynamic resource allocation feature helps Spark to flexibly respond to workload changes and avoid excessive job execution time due to insufficient resources or resource waste due to excessive resources. This improves the overall resource utilization of the cluster.
Resource allocation policy
If you enable the dynamic resource allocation feature and a job is waiting to be scheduled, the driver requests an additional executor. If the job wait time exceeds the value of the spark.dynamicAllocation.schedulerBacklogTimeout parameter, the driver requests executors in batches. The default value of this parameter is 1 second. The default value of the spark.dynamicAllocation.sustainedSchedulerBacklogTimeout parameter is 1 second. If remaining tasks need to be scheduled, the driver continuously request additional executors every second. The number of executors requested in each batch exponentially increases. For example, the first batch requests one executor, the second batch requests two executors, the third batch requests four executors, and the fourth batch requests eight executors. Subsequent batches request executors in the same pattern.
Resource release policy
For idle executors, the driver automatically releases executors when the idle time exceeds the value of the spark.dynamicAllocation.executorIdleTimeout parameter to optimize resource utilization. The default value of this parameter is 60 seconds.
Enable dynamic resource allocation
The dynamic resource allocation feature of Spark can be used in Standalone, YARN, Mesos, and Kubernetes running modes. By default, the feature is disabled. To enable the dynamic resource allocation feature, set the spark.dynamicAllocation.enabled parameter to true and perform the following configurations:
Enable External Shuffle Service (ESS): Set
spark.shuffle.service.enabledtotrueand configure ESS on each worker node in the same cluster.Enable shuffled data tracking: If you set
spark.dynamicAllocation.shuffleTracking.enabledtotrue, Spark tracks the location and status of the shuffled data to ensure that the data is not lost and can be recalculated based on your business requirements.Enable node decommissioning: Set
spark.decommission.enabledandspark.storage.decommission.shuffleBlocks.enabledtotrueto ensure that Spark actively copies the shuffled data blocks on decommissioned nodes to other available nodes.Configure the ShuffleDataIO plug-in: Configure the
spark.shuffle.sort.io.plugin.classparameter to specify the ShuffleDataIO plug-in class. You can also specify custom the I/O operations for shuffled data to write the data to different storage systems.
Configuration method varies in different modes.
Spark applications in standalone mode: You need to only enable ESS.
Spark on Kubernetes:
If you do not use Celeborn as Remote Shuffle Service (RSS), set
spark.dynamicAllocation.shuffleTracking.enabledtotrueto enable shuffled data tracking.Use Celeborn as RSS:
If the Spark version is 3.5.0 or later, set
spark.shuffle.sort.io.plugin.classtoorg.apache.spark.shuffle.celeborn.CelebornShuffleDataIOto configure the ShuffleDataIO plug-in.If the Spark version is earlier than 3.5.0, you must patch the Spark version. No additional configuration is required. For more information, see Support Spark Dynamic Allocation.
If the Spark version is 3.4.0 or later, we recommend that you set the
spark.dynamicAllocation.shuffleTracking.enabledtofalseto prevent the executor from being released when it is idle.
This example describes how to configure RSS in Spark in Kubernetes mode, including scenarios in which you do not use Celeborn as RSS.
Prerequisites
The ack-spark-operator component is installed.
NoteIn this example,
spark.jobNamespaces=["spark"]is used for deployment. If you require a different namespace, modify the namespace field based on your business requirements.The ack-spark-history-server component is deployed.
NoteIn this example, a persistent volume (PV) named
nas-pvand a persistent volume claim (PVC) namednas-pvcare created, and the Spark History Server is configured to read Spark event logs from the/spark/event-logspath of the File Storage NAS (NAS) file system.(Optional) The ack-celeborn component is deployed.
Test data is prepared and uploaded to Object Storage Service (OSS).
Procedure
Do not use Celeborn as RSS
In this example, Spark 3.5.4 is used. If you do not use Celeborn as RSS and enable dynamic resource allocation, configure the following parameters:
Enable dynamic resource allocation: Set
spark.dynamicAllocation.enabledto"true".Enable shuffled data tracking and implement dynamic resource allocation without relying on ESS:
spark.dynamicAllocation.shuffleTracking.enabled: "true".
Create a file named
spark-pagerank-dra.yamland copy the following content to the file. The file is used to create a SparkApplication.apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-dra namespace: spark spec: type: Scala mode: cluster # Replace <SPARK_IMAGE> with your Spark image. image: <SPARK_IMAGE> mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # Replace <OSS_BUCKET> with the name of your OSS bucket. - oss://<OSS_BUCKET>/data/pagerank_dataset.txt - "10" sparkVersion: 3.5.4 hadoopConf: # Use the oss:// format to access OSS data. fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem # The endpoint to access OSS. You must replace <OSS_ENDPOINT> with your OSS access endpoint. # For example, the internal endpoint to access OSS in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com fs.oss.endpoint: <OSS_ENDPOINT> # Obtain OSS access credentials from environment variables. fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider sparkConf: # ==================== # Event logs # ==================== spark.eventLog.enabled: "true" spark.eventLog.dir: file:///mnt/nas/spark/event-logs # ==================== # Dynamic resource allocation # ==================== # Enable dynamic resource allocation spark.dynamicAllocation.enabled: "true" # Enable shuffled file tracing to implement dynamic resource allocation without relying on ESS. spark.dynamicAllocation.shuffleTracking.enabled: "true" # The initial value of the number of executors. spark.dynamicAllocation.initialExecutors: "1" # The minimum number of executors. spark.dynamicAllocation.minExecutors: "0" # The maximum number of executors. spark.dynamicAllocation.maxExecutors: "5" # The idle timeout period of the executor. If the timeout period is exceeded, the executor is released. spark.dynamicAllocation.executorIdleTimeout: 60s # The idle timeout period of the executor that cached the data block. If the timeout period is exceeded, the executor is released. The default value is infinity, which specifies that the data block is not released. # spark.dynamicAllocation.cachedExecutorIdleTimeout: # When a job exceeds the specified scheduled time, additional executors are requested. spark.dynamicAllocation.schedulerBacklogTimeout: 1s # After each time interval, subsequent batches of executors are requested. spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s driver: cores: 1 coreLimit: 1200m memory: 512m envFrom: - secretRef: name: spark-oss-secret volumeMounts: - name: nas mountPath: /mnt/nas serviceAccount: spark-operator-spark executor: cores: 1 coreLimit: "2" memory: 8g envFrom: - secretRef: name: spark-oss-secret volumeMounts: - name: nas mountPath: /mnt/nas volumes: - name: nas persistentVolumeClaim: claimName: nas-pvc restartPolicy: type: NeverNoteThe Spark image used in the preceding sample job must contain Hadoop OSS SDK dependencies. To build an image, use the following Dockerfile and push the image to your image repository:
ARG SPARK_IMAGE=spark:3.5.4 FROM ${SPARK_IMAGE} # Add dependency for Hadoop Aliyun OSS support ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jarsRun the following command to submit the Spark job:
kubectl apply -f spark-pagerank-dra.yamlExpected output:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-dra createdView driver logs.
kubectl logs -n spark spark-pagerank-dra-driver | grep -a2 -b2 "Going to request"Expected output:
3544-25/01/16 03:26:04 INFO SparkKubernetesClientFactory: Auto-configuring K8s client using current context from users K8s config file 3674-25/01/16 03:26:06 INFO Utils: Using initial executors = 1, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances 3848:25/01/16 03:26:06 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, sharedSlotFromPendingPods: 2147483647. 4026-25/01/16 03:26:06 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs 4106-25/01/16 03:26:06 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script -- 10410-25/01/16 03:26:15 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.95.190, executor 1, partition 0, PROCESS_LOCAL, 9807 bytes) 10558-25/01/16 03:26:15 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.95.190:34327 (size: 12.5 KiB, free: 4.6 GiB) 10690:25/01/16 03:26:16 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2, known: 1, sharedSlotFromPendingPods: 2147483647. 10868-25/01/16 03:26:16 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2 for resource profile id: 0) 11030-25/01/16 03:26:16 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCsThe logs show that the driver requests an executor at 03:26:06 and 03:26:16.
Use Spark History Server to view logs. For more information, see Use Spark History Server to view information about Spark jobs.

The event timeline of the job shows that two executors are sequentially created.
Use Celeborn as RSS
In this example, Spark 3.5.4 is used. If you use Celeborn as RSS and enable dynamic resource allocation, configure the following parameters:
Enable dynamic resource allocation: Set
spark.dynamicAllocation.enabledto"true".For Spark 3.5.0 and later, set
spark.shuffle.sort.io.plugin.classtoorg.apache.spark.shuffle.celeborn.CelebornShuffleDataIO.Ensure that idle executors are released at the earliest opportunity: Set
spark.dynamicAllocation.shuffleTracking.enabledto"false".
Create a file named
spark-pagerank-celeborn-dra.yamland copy the following content to the file. The file is used to create a SparkApplication.apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pagerank-celeborn-dra namespace: spark spec: type: Scala mode: cluster # Replace <SPARK_IMAGE> with your Spark image. The image must contain JindoSDK. image: <SPARK_IMAGE> mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar mainClass: org.apache.spark.examples.SparkPageRank arguments: # Replace <OSS_BUCKET> with the name of your OSS bucket. - oss://<OSS_BUCKET>/data/pagerank_dataset.txt - "10" sparkVersion: 3.5.4 hadoopConf: # Use the oss:// format to access OSS data. fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem # The endpoint to access OSS. You must replace <OSS_ENDPOINT> with your OSS access endpoint. # For example, the internal endpoint to access OSS in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com fs.oss.endpoint: <OSS_ENDPOINT> # Obtain OSS access credentials from environment variables. fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider sparkConf: # ==================== # Event logs # ==================== spark.eventLog.enabled: "true" spark.eventLog.dir: file:///mnt/nas/spark/event-logs # ==================== # Celeborn # Ref: https://github.com/apache/celeborn/blob/main/README.md#spark-configuration # ==================== # Shuffle manager class name changed in 0.3.0: # before 0.3.0: `org.apache.spark.shuffle.celeborn.RssShuffleManager` # since 0.3.0: `org.apache.spark.shuffle.celeborn.SparkShuffleManager` spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager # Must use kryo serializer because java serializer do not support relocation spark.serializer: org.apache.spark.serializer.KryoSerializer # Configure this parameter based on the number of replicas on the Celeborn master node. spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local # options: hash, sort # Hash shuffle writer use (partition count) * (celeborn.push.buffer.max.size) * (spark.executor.cores) memory. # Sort shuffle writer uses less memory than hash shuffle writer, if your shuffle partition count is large, try to use sort hash writer. spark.celeborn.client.spark.shuffle.writer: hash # We recommend setting `spark.celeborn.client.push.replicate.enabled` to true to enable server-side data replication # If you have only one worker, this setting must be false # If your Celeborn is using HDFS, it's recommended to set this setting to false spark.celeborn.client.push.replicate.enabled: "false" # Support for Spark AQE only tested under Spark 3 spark.sql.adaptive.localShuffleReader.enabled: "false" # we recommend enabling aqe support to gain better performance spark.sql.adaptive.enabled: "true" spark.sql.adaptive.skewJoin.enabled: "true" # If the Spark version is 3.5.0 or later, configure this parameter to support dynamic resource allocation. spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO spark.executor.userClassPathFirst: "false" # ==================== # Dynamic resource allocation # Ref: https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation # ==================== # Enable dynamic resource allocation spark.dynamicAllocation.enabled: "true" # Enable shuffled file tracing and implement dynamic resource allocation without relying on ESS. # If the Spark version is 3.4.0 or later, we recommend that you disable this feature when you use Celeborn as RSS. spark.dynamicAllocation.shuffleTracking.enabled: "false" # The initial value of the number of executors. spark.dynamicAllocation.initialExecutors: "1" # The minimum number of executors. spark.dynamicAllocation.minExecutors: "0" # The maximum number of executors. spark.dynamicAllocation.maxExecutors: "5" # The idle timeout period of the executor. If the timeout period is exceeded, the executor is released. spark.dynamicAllocation.executorIdleTimeout: 60s # The idle timeout period of the executor that cached the data block. If the timeout period is exceeded, the executor is released. The default value is infinity, which specifies that the data block is not released. # spark.dynamicAllocation.cachedExecutorIdleTimeout: # When a job exceeds the specified scheduled time, additional executors are requested. spark.dynamicAllocation.schedulerBacklogTimeout: 1s # After each time interval, subsequent batches of executors are requested. spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s driver: cores: 1 coreLimit: 1200m memory: 512m envFrom: - secretRef: name: spark-oss-secret volumeMounts: - name: nas mountPath: /mnt/nas serviceAccount: spark-operator-spark executor: cores: 1 coreLimit: "1" memory: 4g envFrom: - secretRef: name: spark-oss-secret volumeMounts: - name: nas mountPath: /mnt/nas volumes: - name: nas persistentVolumeClaim: claimName: nas-pvc restartPolicy: type: NeverNoteThe Spark image used in the preceding sample job must contain Hadoop OSS SDK dependencies. To build an image, use the following Dockerfile and push the image to your image repository:
ARG SPARK_IMAGE=spark:3.5.4 FROM ${SPARK_IMAGE} # Add dependency for Hadoop Aliyun OSS support ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars # Add dependency for Celeborn ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.3/celeborn-client-spark-3-shaded_2.12-0.5.3.jar ${SPARK_HOME}/jarsRun the following command to submit the SparkApplication:
kubectl apply -f spark-pagerank-celeborn-dra.yamlExpected output:
sparkapplication.sparkoperator.k8s.io/spark-pagerank-celeborn-dra createdView Driver logs.
kubectl logs -n spark spark-pagerank-celeborn-dra-driver | grep -a2 -b2 "Going to request"Expected output:
3544-25/01/16 03:51:28 INFO SparkKubernetesClientFactory: Auto-configuring K8s client using current context from users K8s config file 3674-25/01/16 03:51:30 INFO Utils: Using initial executors = 1, max of spark.dynamicAllocation.initialExecutors, spark.dynamicAllocation.minExecutors and spark.executor.instances 3848:25/01/16 03:51:30 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1, known: 0, sharedSlotFromPendingPods: 2147483647. 4026-25/01/16 03:51:30 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCs 4106-25/01/16 03:51:30 INFO CelebornShuffleDataIO: Loading CelebornShuffleDataIO -- 11796-25/01/16 03:51:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0) (192.168.95.163, executor 1, partition 0, PROCESS_LOCAL, 9807 bytes) 11944-25/01/16 03:51:42 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.95.163:37665 (size: 13.3 KiB, free: 2.1 GiB) 12076:25/01/16 03:51:42 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2, known: 1, sharedSlotFromPendingPods: 2147483647. 12254-25/01/16 03:51:42 INFO ExecutorAllocationManager: Requesting 1 new executor because tasks are backlogged (new desired total will be 2 for resource profile id: 0) 12416-25/01/16 03:51:42 INFO ExecutorPodsAllocator: Found 0 reusable PVCs from 0 PVCsThe command output indicates that the driver requests one executor resource at
03:51:30and another executor resource at03:51:42.Use Spark History Server to view logs. For more information, see Use Spark History Server to view information about Spark jobs.

The event timeline of the job shows that two executors are sequentially created.