Enterprises usually need to run large numbers of jobs based on limited resources. To resolve this issue, they must prioritize the key departments or individuals when allocating resources and guarantee the high flexibility of resource allocation. This topic describes how to improve the utilization of cluster resources and automate RayJobs created by different departments on a job management platform. You can insert RayJobs and dynamically adjust priorities to ensure that resources are preferably allocated to RayJobs with high priorities.
Why do we need to automate RayJobs in ACK clusters?
Resource management and optimization
You can use a combination of Container Service for Kubernetes (ACK) and Ray to automate job scheduling in order to manage and allocate compute resources more flexibly. For example, in some scenarios, different numbers of vCPUs are allocated to different teams (text content team and video content team). Using Ray enables you to efficiently schedule jobs based on limited resources and ensures that each team has sufficient resources. This helps avoid resource waste and resource contention.
Priority control
Jobs created by algorithm engineers usually have higher priorities. You can create policies in ACK and use the scheduling mechanism of Ray to prioritize RayJobs based on user roles. This allows you to complete a variety of low-priority jobs with limited resources while ensuring the success of key projects.
Auto scaling
You may need to adjust the compute resources that each department can use to meet development requirements. You can use the auto scaling feature of ACK to dynamically scale nodes based on loads and use Ray to seamlessly balance workloads to obtain the optimal performance.
The following figure shows the allocation of vCPUs.
To manage quotas and define the priorities of jobs, you need to develop a resource quota mechanism and job scheduling system in the cluster.
You can create resource quotas to specify the maximum amount of resources or the maximum number of machines that each department or individual can use. Before the system starts a job, it checks whether the resource quota of the job can meet the requirement of the job. The system allocates compute resources to the job and starts the job only after confirming that the resource quota can fulfil the resource request. This improves resource utilization and ensures the high priorities of important jobs.
Preparations
An ACK Pro cluster is created.
For more information about how to create a cluster, see Create an ACK managed cluster.
For more information about how to update the Kubernetes version of a cluster, see Manually update ACK clusters.
The cluster components are installed.
The version of the Kuberay Operator component is v1.2.1.2.gfebe140 or later. For more information, see Install Kuberay Operator.
The version of Kube Queue is v1.21.4 or later. For more information, see Use ack-kube-queue to manage job queues.
A kubectl client is connected to the cluster. For more information, see Obtain the kubeconfig file of a cluster and use kubectl to connect to the cluster.
Configure queues and resources
The job queue and orchestration system must be capable of submitting jobs created by different departments or employees to different queues so that jobs can be managed based on the priorities of the departments or individuals. In addition, the system must support priority-based job scheduling instead of the first in, first out (FIFO) mode to ensure that urgent jobs can be handled promptly.
By using the ACK scheduler, Kube Queue, and ElasticQuotaTree, ACK clusters can support quota limits, elastic quota management, various queueing policies, and queue resource reclaim. This all-in-one solution can meet various job queue requirements.
The following figure shows the relationship among the administrator, researcher, jobs, Kube Queue, ACK scheduler, and controller.
After the ElasticQuotaTree is created, researchers of different departments can submit jobs to the cluster. The administrator needs to create an ElasticQuotaTree in the cluster to define resource quotas and queues. The jobs are allocated to different quota groups based on the predefined resource quotas. The jobs in each queue are sequentially executed based on the queueing policy of the queue. The controller manages the status of dequeued jobs and the ACK scheduler is in charge of pod scheduling. If a dequeued job fails to start after a long period of time, Kube Queue automatically recycles the job and inserts it back to the queue.
Set up a resource quota system
The ElasticQuotaTree defines the quota information of the cluster, including the layered structure of quotas, the amount of resources of each quota, and the namespaces associated with quotas. Jobs submitted in these namespaces are automatically assigned the resource quotas of the namespaces.
To set up the desired resource quota system, you can submit the following ElasticQuotaTree to the cluster.
An ElasticQuotaTree is a tree structure. The max
field defines the maximum amount of resources that can be used and the min
field defines the minimum amount of resources that must be guaranteed. When the minimum resources of a resource quota cannot be guaranteed, the scheduling system attempts to reclaim resources from resource quotas that occupy resources more than the lower limit. No resource is guaranteed for intern-text
and intern-video
jobs. If the algorithm team submits an urgent job and a low-priority job is on-going, the urgent job can preempt the resources from the low-priority job.
Set the priorities of queues
After the ElasticQuotaTree is submitted, Kube Queue creates queues in the cluster. The quota of each leaf node corresponds to a separate queue. After a job is submitted to the cluster, the system submits the job to a queue based on the namespace to which the job belongs and the namespace information in the resource quota configuration of the job. To specify a quota, add the "quota.scheduling.alibabacloud.com/name"
label to the RayJob and RayCluster resources to specify the name of the quota. The default priority of a queue is 0. You can modify the queue resources in the kube-queue namespace to specify the priority of each queue. Kube Queue traverses all queues in a round-robin manner and attempts to execute a job in the head of a queue. During this process, queues with higher priorities are prior to other queues and have a better chance to use idle resources.
You can also modify the Spec of a Queue resource to manage the priority of the queue.
apiVersion: scheduling.x-k8s.io/v1alpha1
kind: Queue
metadata:
annotations:
kube-queue/quota-fullname: root/group-video/algorithm-video
creationTimestamp: "2024-10-08T06:43:07Z"
generation: 1
labels:
create-by-kubequeue: "true"
name: root-group-video-algorithm-video-algorithm-video
namespace: kube-queue
resourceVersion: "5766258"
uid: 01342987-3dad-401b-8509-ef7250683377
spec:
queuePolicy: Round
# For example, add Priority=2 to the Spec of a queue to set its priority to 2.
priority: 2
The following figure shows the relationship between resource quotas and queues. By default, queues are scheduled in a round-robin manner. For more information about other queue scheduling methods, see Use ack-kube-queue to manage job queues.
After you configure resource quotas, you can add the Suspend label to a job when you create the job to submit it to a queue. For RayJobs, you only need to set the .spec.suspend
field to True
. For other types of jobs that support queues, use a similar method to submit them to queues. For more information, see Use ack-kube-queue to manage job queues. After a RayJob is submitted, the system calculates the sum of the resource request of the head pod plus the amount of resources per WorkerGroup (resource request of each pod × number of replicated pods). The sum is the total amount of resources that the job requires.
Example on how to use Kube Queue to manage queues
In the following example, multiple RayJobs are submitted to demonstrate how to use Kube Queue. When a RayJob enters the Succeeded state, the job is completed. In this case, Kube Queue allows the subsequent job to start. Take note that pod scheduling in a newly created RayCluster must wait until the RayCluster associated with the previous RayJob is deleted and the resources occupied by the job are released. The following code block is an example.
Two jobs are submitted. Each job requires 31 vCPUs. In this case, resources are allocated to only one job. The other job remains in the Suspended state. You can run the following commands to query the Status field to view the queueing information.
# The output indicates that only one job is in the Initializing state. The other job is in the Suspended state.
➜ kubequeue-doc kubectl get rayjob -n algorithm-text
NAME JOB STATUS DEPLOYMENT STATUS START TIME END TIME AGE
rayjob-sample-j87d8 4s
rayjob-sample-rhm9s Initializing 2024-10-08T07:56:31Z 7s
# Query queues in the cluster.
➜ kubequeue-doc kubectl -n kube-queue get queue
NAME AGE
root-group-text-algorithm-text-algorithm-text 73m
root-group-text-intern-text-intern-text 73m
root-group-video-algorithm-video-algorithm-video 73m
root-group-video-intern-video-intern-video 73m
# Query the queue to which the job is submitted. The Status field indicates that rayjob-sample-j87d8 is in the backoff queue. Its sequence number is 1, which means that the job is immediately executed after the current job is completed.
➜ kubequeue-doc kubectl -n kube-queue get queue root-group-text-algorithm-text-algorithm-text -oyaml
apiVersion: scheduling.x-k8s.io/v1alpha1
kind: Queue
metadata:
annotations:
kube-queue/quota-fullname: root/group-text/algorithm-text
creationTimestamp: "2024-10-08T06:43:07Z"
generation: 1
labels:
create-by-kubequeue: "true"
name: root-group-text-algorithm-text-algorithm-text
namespace: kube-queue
resourceVersion: "5802083"
uid: 83a3bf55-cd96-4405-9629-7e37512ac4b6
spec:
queuePolicy: Round
status:
queueItemDetails:
active: []
backoff:
- name: rayjob-sample-j87d8-ray-qu # Specify the same name in the following command.
namespace: algorithm-text
position: 1
# Query the status of the Queue resource. The cause of failure is that the resource quota of root/group-text/algorithm-text exceeds the predefined upper limit.
➜ kubequeue-doc kubectl get queue -n algorithm-text rayjob-sample-j87d8-ray-qu -oyaml
...
status:
lastUpdateTime: "2024-10-08T08:01:46Z"
message: 'Insufficient quota(cpu) in quota root/group-text/algorithm-text: request
31, max 40, used 31, oversellreate 1. Wait for running jobs to complete'
phase: Enqueued
If a job with a lower priority is submitted, the job is also in the Queueing state. The system prompts that the low-priority job is suspended because the resource quota of root/group-text
has exceeded the upper limit. Run the following command to query the queueing details.
➜ kubequeue-doc kubectl get queue -n intern-text
NAME AGE
rayjob-sample-n5gzf-ray-qu 9s # Specify the same name in the following command.
➜ kubequeue-doc kubectl get queue -n intern-text rayjob-sample-n5gzf-ray-qu -oyaml
apiVersion: scheduling.x-k8s.io/v1alpha1
kind: QueueUnit
metadata:
creationTimestamp: "2024-10-08T08:07:48Z"
generation: 1
name: rayjob-sample-n5gzf-ray-qu
namespace: intern-text
ownerReferences:
- apiVersion: ray.io/v1
kind: RayJob
name: rayjob-sample-n5gzf
uid: d44af490-b595-4876-9463-bd22ff826848
resourceVersion: "5807774"
uid: b9f7e900-ccf9-47cb-817b-8d4e61575369
spec:
consumerRef:
apiVersion: ray.io/v1
kind: RayJob
name: rayjob-sample-n5gzf
namespace: intern-text
podSet:
- count: 1
name: head
template:
metadata: {}
spec:
containers:
- image: rayproject/ray:2.9.0
name: ray-head
ports:
- containerPort: 6379
name: gcs-server
protocol: TCP
- containerPort: 8265
name: dashboard
protocol: TCP
- containerPort: 10001
name: client
protocol: TCP
resources:
limits:
cpu: "1"
requests:
cpu: "1"
volumeMounts:
- mountPath: /home/ray/samples
name: code-sample
volumes:
- configMap:
items:
- key: sample_code.py
path: sample_code.py
name: ray-job-code-sample
name: code-sample
- count: 30
name: small-group
template:
metadata: {}
spec:
containers:
- image: rayproject/ray:2.9.0
lifecycle:
preStop:
exec:
command:
- /bin/sh
- -c
- ray stop
name: ray-worker
resources:
limits:
cpu: "1"
requests:
cpu: "1"
resource:
cpu: "31"
status:
lastUpdateTime: "2024-10-08T08:08:03Z"
message: 'Insufficient quota(cpu) in parent quota root/group-text: request 31, max
40, used 31, oversellreate 1. Wait for running jobs to complete'
phase: Enqueued
The queue of the algorithm team has a higher priority. After the rayjob-sample-rhm9s
job is completed, resources are preferably allocated to the jobs in the queue of the algorithm team. Consequently, these jobs start earlier than jobs in other queues.
# Delete a job to simulate the completion of a job.
➜ kubequeue-doc kubectl delete rayjob -n algorithm-text rayjob-sample-rhm9s
rayjob.ray.io "rayjob-sample-rhm9s" deleted
# Jobs in the queue of the algorithm team start.
➜ kubequeue-doc kubectl get rayjob -n algorithm-text
NAME JOB STATUS DEPLOYMENT STATUS START TIME END TIME AGE
rayjob-sample-j87d8 Initializing 2024-10-08T08:24:09Z 27m
# The low-priority job is still suspended.
➜ kubequeue-doc kubectl get rayjob -n intern-text
NAME JOB STATUS DEPLOYMENT STATUS START TIME END TIME AGE
rayjob-sample-n5gzf 16m
Specify a node pool for job scheduling
Ray allows you to specify a node pool for job scheduling. This way, you can optimize resource usage, improve performance, or meet specific security and isolation requirements. You may need to specify a node pool for job scheduling in the following scenarios.
Fulfil resource requests
If some of your jobs have hardware requirements, such as GPU computing, you can add the matching nodes to a separate node pool and schedule only jobs that have hardware requirements to the node pool.
Security and isolation
You may want to run data-sensitive jobs in a secure and controllable environment. In this scenario, you can create a dedicated node pool and ensure that only authorized services can access the nodes in the node pool. This way, the node pool is isolated from other services.
Performance optimization
To ensure the optimal performance of certain workloads, you may need to deploy them on a specific type of infrastructure. For example, I/O-intensive jobs are more suitable for servers with high-speed network connections and SSDs.
Cost control
You can create node pools that consist of instances belonging to different price tiers based on the pricing policies of the cloud service provider to better control costs. For example, you can run unimportant jobs on low-performance but cost-effective instances.
Multi-tenant environment
A cluster is shared by multiple teams. Each team wants to manage and optimize their workloads separately. In this scenario, you can create a node pool for each team so that the teams can plan and adjust their own resources.
To schedule pods to a specific node pool, you can add node affinity rules to the configuration of the head pod and WorkerGroup pods. In addition, you can create a resource policy to prioritize resources to which pods are scheduled. For example, you can preferably schedule pods to Elastic Compute Service (ECS) instances. When ECS instances are out of stock, pods are scheduled to elastic container instances or Alibaba Cloud Container Compute Service (ACS) nodes. The following code block is an example.
Create a resource policy to schedule jobs to serverless resources when the node pool does not have sufficient resources.
Currently, gang scheduling does not support ACS resources.
apiVersion: scheduling.alibabacloud.com/v1alpha1
kind: ResourcePolicy
metadata:
name: rayjob-sample-tnj27-raycluster-s6t8b-small-group
namespace: algorithm-text
spec:
selector:
# Prioritize a WorkerGroup of a RayCluster. ray.io/group specifies the name of a WorkerGroup.
# ray.io/cluster specifies a RayCluster.
ray.io/group: small-group
ray.io/cluster: rayjob-sample-tnj27-raycluster-s6t8b
ray.io/node-type: worker
strategy: prefer
units:
- nodeSelector:
alibabacloud.com/nodepool-id: np9c9b663eb55d44d0943009d5c3d32781
resource: ecs
- resource: acs
Configure gang scheduling
If you want multiple servers to collaborate and reduce the complexity of programming in the following scenarios, you can use Ray to configure gang scheduling.
Large-scale machine learning model training
A single server cannot provide sufficient compute resources to process large datasets or complex models. In this case, you need a group of containers to collaborate. Gang scheduling can ensure that these containers do not compete for resources, avoid dead locks, and improve the training efficiency.
MPI framework
Parallel computing based on multiple threads in the MPI framework require the primary and secondary threads to collaborate. Gang scheduling can schedule these threads concurrently, reduce the communication latency, and improve the computing efficiency.
Big data processing and analysis
For applications that need to process large amounts of data, such as log analysis and real-time stream processing, you may need to run multiple jobs at the same time to complete complex tasks. Gang scheduling can ensure that these jobs are scheduled concurrently to improve the overall efficiency.
Custom distributed application development
You may need to build player matching in a gaming server architecture or handle data collected from a large number of IoT devices.
To submit a gang scheduling request in a RayJob, you only need to declare ray.io/scheduler-name=kube-scheduler
in the metadata of the RayJob in an ACK Pro or ACK Lingjun cluster. After the job is submitted, the RayOperator will inject the gang scheduling label into pods.
The following code block is an example.
Add pod labels, which are used to identify, filter, and manage pods. The following code block is an example.
When resources are insufficient, you can query events to locate the cause of scheduling failures. Use --field-selector='type=Warning,reason=GangFailedScheduling'
to find failure records related to gang scheduling. "cycle xx" indicates the cause of scheduling failures in different rounds. The following code block is an example.
➜ kubequeue-doc kubectl get events -n algorithm-text --field-selector='type=Warning,reason=GangFailedScheduling' | grep "cycle 1"
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-89mlq rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-89mlq in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8fwmr rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8fwmr in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8g5wv rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8g5wv in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8tn4w rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8tn4w in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-97gpk rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-97gpk in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-9xsgw rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-9xsgw in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-gwxhg rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-gwxhg in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-jzw6k rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-jzw6k in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-kb55s rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-kb55s in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-lbvk7 rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-lbvk7 in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-ms96b rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-ms96b in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-sgr9g rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-sgr9g in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-svt6g rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-svt6g in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s Warning GangFailedScheduling pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-wm5c6 rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-wm5c6 in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.