All Products
Search
Document Center

Container Service for Kubernetes:Automate RayJobs in ACK clusters

Last Updated:Nov 18, 2024

Enterprises usually need to run large numbers of jobs based on limited resources. To resolve this issue, they must prioritize the key departments or individuals when allocating resources and guarantee the high flexibility of resource allocation. This topic describes how to improve the utilization of cluster resources and automate RayJobs created by different departments on a job management platform. You can insert RayJobs and dynamically adjust priorities to ensure that resources are preferably allocated to RayJobs with high priorities.

Why do we need to automate RayJobs in ACK clusters?

  • Resource management and optimization

    You can use a combination of Container Service for Kubernetes (ACK) and Ray to automate job scheduling in order to manage and allocate compute resources more flexibly. For example, in some scenarios, different numbers of vCPUs are allocated to different teams (text content team and video content team). Using Ray enables you to efficiently schedule jobs based on limited resources and ensures that each team has sufficient resources. This helps avoid resource waste and resource contention.

  • Priority control

    Jobs created by algorithm engineers usually have higher priorities. You can create policies in ACK and use the scheduling mechanism of Ray to prioritize RayJobs based on user roles. This allows you to complete a variety of low-priority jobs with limited resources while ensuring the success of key projects.

  • Auto scaling

    You may need to adjust the compute resources that each department can use to meet development requirements. You can use the auto scaling feature of ACK to dynamically scale nodes based on loads and use Ray to seamlessly balance workloads to obtain the optimal performance.

The following figure shows the allocation of vCPUs.

image

To manage quotas and define the priorities of jobs, you need to develop a resource quota mechanism and job scheduling system in the cluster.

You can create resource quotas to specify the maximum amount of resources or the maximum number of machines that each department or individual can use. Before the system starts a job, it checks whether the resource quota of the job can meet the requirement of the job. The system allocates compute resources to the job and starts the job only after confirming that the resource quota can fulfil the resource request. This improves resource utilization and ensures the high priorities of important jobs.

Preparations

Configure queues and resources

The job queue and orchestration system must be capable of submitting jobs created by different departments or employees to different queues so that jobs can be managed based on the priorities of the departments or individuals. In addition, the system must support priority-based job scheduling instead of the first in, first out (FIFO) mode to ensure that urgent jobs can be handled promptly.

By using the ACK scheduler, Kube Queue, and ElasticQuotaTree, ACK clusters can support quota limits, elastic quota management, various queueing policies, and queue resource reclaim. This all-in-one solution can meet various job queue requirements.

The following figure shows the relationship among the administrator, researcher, jobs, Kube Queue, ACK scheduler, and controller.

image

After the ElasticQuotaTree is created, researchers of different departments can submit jobs to the cluster. The administrator needs to create an ElasticQuotaTree in the cluster to define resource quotas and queues. The jobs are allocated to different quota groups based on the predefined resource quotas. The jobs in each queue are sequentially executed based on the queueing policy of the queue. The controller manages the status of dequeued jobs and the ACK scheduler is in charge of pod scheduling. If a dequeued job fails to start after a long period of time, Kube Queue automatically recycles the job and inserts it back to the queue.

Set up a resource quota system

The ElasticQuotaTree defines the quota information of the cluster, including the layered structure of quotas, the amount of resources of each quota, and the namespaces associated with quotas. Jobs submitted in these namespaces are automatically assigned the resource quotas of the namespaces.

To set up the desired resource quota system, you can submit the following ElasticQuotaTree to the cluster.

View sample code

apiVersion: scheduling.sigs.k8s.io/v1beta1
kind: ElasticQuotaTree
metadata:
  name: elasticquotatree # Only one ElasticQuotaTree is supported.
  namespace: kube-system # The elastic quota group takes effect only if it is created in the kube-system namespace.
spec:
  root:
    name: root 
    max:
      cpu: 100
      memory: 100Gi
    min:
      cpu: 100
      memory: 100Gi
    children:
    - name: group-text
      max:
        cpu: 40
        memory: 100Gi
      min:
        cpu: 40
        memory: 100Gi
      children:
      - name: algorithm-text
        max:
          cpu: 40
          memory: 100Gi
        min:
          cpu: 40
          memory: 100Gi
        namespaces: # Configure the namespace.
          - algorithm-text
      - name: intern-text
        max:
          cpu: 40
          memory: 100Gi
        min:
          cpu: 0
          memory: 0
        namespaces: # Configure the namespace.
          - intern-text
    - name: group-video
      max:
        cpu: 60
        memory: 100Gi
      min:
        cpu: 60
        memory: 100Gi
      children:
      - name: algorithm-video
        max:
          cpu: 60
          memory: 100Gi
        min:
          cpu: 60
          memory: 100Gi
        namespaces: # Configure the namespace.
          - algorithm-video
      - name: intern-video
        max:
          cpu: 60
          memory: 100Gi
        min:
          cpu: 0
          memory: 0
        namespaces: # Configure the namespace.
          - intern-video

An ElasticQuotaTree is a tree structure. The max field defines the maximum amount of resources that can be used and the min field defines the minimum amount of resources that must be guaranteed. When the minimum resources of a resource quota cannot be guaranteed, the scheduling system attempts to reclaim resources from resource quotas that occupy resources more than the lower limit. No resource is guaranteed for intern-text and intern-video jobs. If the algorithm team submits an urgent job and a low-priority job is on-going, the urgent job can preempt the resources from the low-priority job.

Set the priorities of queues

After the ElasticQuotaTree is submitted, Kube Queue creates queues in the cluster. The quota of each leaf node corresponds to a separate queue. After a job is submitted to the cluster, the system submits the job to a queue based on the namespace to which the job belongs and the namespace information in the resource quota configuration of the job. To specify a quota, add the "quota.scheduling.alibabacloud.com/name" label to the RayJob and RayCluster resources to specify the name of the quota. The default priority of a queue is 0. You can modify the queue resources in the kube-queue namespace to specify the priority of each queue. Kube Queue traverses all queues in a round-robin manner and attempts to execute a job in the head of a queue. During this process, queues with higher priorities are prior to other queues and have a better chance to use idle resources.

You can also modify the Spec of a Queue resource to manage the priority of the queue.

apiVersion: scheduling.x-k8s.io/v1alpha1
kind: Queue
metadata:
  annotations:
    kube-queue/quota-fullname: root/group-video/algorithm-video
  creationTimestamp: "2024-10-08T06:43:07Z"
  generation: 1
  labels:
    create-by-kubequeue: "true"
  name: root-group-video-algorithm-video-algorithm-video
  namespace: kube-queue
  resourceVersion: "5766258"
  uid: 01342987-3dad-401b-8509-ef7250683377
spec:
  queuePolicy: Round
  # For example, add Priority=2 to the Spec of a queue to set its priority to 2.
  priority: 2

The following figure shows the relationship between resource quotas and queues. By default, queues are scheduled in a round-robin manner. For more information about other queue scheduling methods, see Use ack-kube-queue to manage job queues.

image

After you configure resource quotas, you can add the Suspend label to a job when you create the job to submit it to a queue. For RayJobs, you only need to set the .spec.suspend field to True. For other types of jobs that support queues, use a similar method to submit them to queues. For more information, see Use ack-kube-queue to manage job queues. After a RayJob is submitted, the system calculates the sum of the resource request of the head pod plus the amount of resources per WorkerGroup (resource request of each pod × number of replicated pods). The sum is the total amount of resources that the job requires.

Example on how to use Kube Queue to manage queues

In the following example, multiple RayJobs are submitted to demonstrate how to use Kube Queue. When a RayJob enters the Succeeded state, the job is completed. In this case, Kube Queue allows the subsequent job to start. Take note that pod scheduling in a newly created RayCluster must wait until the RayCluster associated with the previous RayJob is deleted and the resources occupied by the job are released. The following code block is an example.

View sample code

apiVersion: ray.io/v1
kind: RayJob
metadata:
  generateName: rayjob-sample-
  namespace: algorithm-text
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"

  # Suspend specifies whether the RayJob controller should create a RayCluster instance.
  # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
  # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
  suspend: true

  rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "1"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            # You set volumes at the Pod level, then mount them into containers inside that Pod
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      - replicas: 10
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "1"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
  namespace: algorithm-text
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

Two jobs are submitted. Each job requires 31 vCPUs. In this case, resources are allocated to only one job. The other job remains in the Suspended state. You can run the following commands to query the Status field to view the queueing information.

# The output indicates that only one job is in the Initializing state. The other job is in the Suspended state.
➜  kubequeue-doc kubectl get rayjob -n algorithm-text
NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME             END TIME   AGE
rayjob-sample-j87d8                                                                      4s
rayjob-sample-rhm9s                Initializing        2024-10-08T07:56:31Z              7s


# Query queues in the cluster.
➜  kubequeue-doc kubectl -n kube-queue get queue
NAME                                               AGE
root-group-text-algorithm-text-algorithm-text      73m
root-group-text-intern-text-intern-text            73m
root-group-video-algorithm-video-algorithm-video   73m
root-group-video-intern-video-intern-video         73m


# Query the queue to which the job is submitted. The Status field indicates that rayjob-sample-j87d8 is in the backoff queue. Its sequence number is 1, which means that the job is immediately executed after the current job is completed.
➜  kubequeue-doc kubectl -n kube-queue get queue  root-group-text-algorithm-text-algorithm-text -oyaml
apiVersion: scheduling.x-k8s.io/v1alpha1
kind: Queue
metadata:
  annotations:
    kube-queue/quota-fullname: root/group-text/algorithm-text
  creationTimestamp: "2024-10-08T06:43:07Z"
  generation: 1
  labels:
    create-by-kubequeue: "true"
  name: root-group-text-algorithm-text-algorithm-text
  namespace: kube-queue
  resourceVersion: "5802083"
  uid: 83a3bf55-cd96-4405-9629-7e37512ac4b6
spec:
  queuePolicy: Round
status:
  queueItemDetails:
    active: []
    backoff:
    - name: rayjob-sample-j87d8-ray-qu     # Specify the same name in the following command.
      namespace: algorithm-text
      position: 1
      
 
# Query the status of the Queue resource. The cause of failure is that the resource quota of root/group-text/algorithm-text exceeds the predefined upper limit. 
➜  kubequeue-doc kubectl get queue -n algorithm-text rayjob-sample-j87d8-ray-qu -oyaml
...
status:
  lastUpdateTime: "2024-10-08T08:01:46Z"
  message: 'Insufficient quota(cpu) in quota root/group-text/algorithm-text: request
    31, max 40, used 31, oversellreate 1. Wait for running jobs to complete'
  phase: Enqueued

If a job with a lower priority is submitted, the job is also in the Queueing state. The system prompts that the low-priority job is suspended because the resource quota of root/group-text has exceeded the upper limit. Run the following command to query the queueing details.

➜  kubequeue-doc kubectl get queue -n intern-text
NAME                         AGE
rayjob-sample-n5gzf-ray-qu   9s              # Specify the same name in the following command.


➜  kubequeue-doc kubectl get queue -n intern-text rayjob-sample-n5gzf-ray-qu -oyaml
apiVersion: scheduling.x-k8s.io/v1alpha1
kind: QueueUnit
metadata:
  creationTimestamp: "2024-10-08T08:07:48Z"
  generation: 1
  name: rayjob-sample-n5gzf-ray-qu
  namespace: intern-text
  ownerReferences:
  - apiVersion: ray.io/v1
    kind: RayJob
    name: rayjob-sample-n5gzf
    uid: d44af490-b595-4876-9463-bd22ff826848
  resourceVersion: "5807774"
  uid: b9f7e900-ccf9-47cb-817b-8d4e61575369
spec:
  consumerRef:
    apiVersion: ray.io/v1
    kind: RayJob
    name: rayjob-sample-n5gzf
    namespace: intern-text
  podSet:
  - count: 1
    name: head
    template:
      metadata: {}
      spec:
        containers:
        - image: rayproject/ray:2.9.0
          name: ray-head
          ports:
          - containerPort: 6379
            name: gcs-server
            protocol: TCP
          - containerPort: 8265
            name: dashboard
            protocol: TCP
          - containerPort: 10001
            name: client
            protocol: TCP
          resources:
            limits:
              cpu: "1"
            requests:
              cpu: "1"
          volumeMounts:
          - mountPath: /home/ray/samples
            name: code-sample
        volumes:
        - configMap:
            items:
            - key: sample_code.py
              path: sample_code.py
            name: ray-job-code-sample
          name: code-sample
  - count: 30
    name: small-group
    template:
      metadata: {}
      spec:
        containers:
        - image: rayproject/ray:2.9.0
          lifecycle:
            preStop:
              exec:
                command:
                - /bin/sh
                - -c
                - ray stop
          name: ray-worker
          resources:
            limits:
              cpu: "1"
            requests:
              cpu: "1"
  resource:
    cpu: "31"
status:
  lastUpdateTime: "2024-10-08T08:08:03Z"
  message: 'Insufficient quota(cpu) in parent quota root/group-text: request 31, max
    40, used 31, oversellreate 1. Wait for running jobs to complete'
  phase: Enqueued

The queue of the algorithm team has a higher priority. After the rayjob-sample-rhm9s job is completed, resources are preferably allocated to the jobs in the queue of the algorithm team. Consequently, these jobs start earlier than jobs in other queues.

# Delete a job to simulate the completion of a job.
➜  kubequeue-doc kubectl delete rayjob -n algorithm-text rayjob-sample-rhm9s
rayjob.ray.io "rayjob-sample-rhm9s" deleted


# Jobs in the queue of the algorithm team start.
➜  kubequeue-doc kubectl get rayjob -n algorithm-text
NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME             END TIME   AGE
rayjob-sample-j87d8                Initializing        2024-10-08T08:24:09Z              27m


# The low-priority job is still suspended.
➜  kubequeue-doc kubectl get rayjob -n intern-text
NAME                  JOB STATUS   DEPLOYMENT STATUS   START TIME   END TIME   AGE
rayjob-sample-n5gzf                                                            16m

Specify a node pool for job scheduling

Ray allows you to specify a node pool for job scheduling. This way, you can optimize resource usage, improve performance, or meet specific security and isolation requirements. You may need to specify a node pool for job scheduling in the following scenarios.

  • Fulfil resource requests

    If some of your jobs have hardware requirements, such as GPU computing, you can add the matching nodes to a separate node pool and schedule only jobs that have hardware requirements to the node pool.

  • Security and isolation

    You may want to run data-sensitive jobs in a secure and controllable environment. In this scenario, you can create a dedicated node pool and ensure that only authorized services can access the nodes in the node pool. This way, the node pool is isolated from other services.

  • Performance optimization

    To ensure the optimal performance of certain workloads, you may need to deploy them on a specific type of infrastructure. For example, I/O-intensive jobs are more suitable for servers with high-speed network connections and SSDs.

  • Cost control

    You can create node pools that consist of instances belonging to different price tiers based on the pricing policies of the cloud service provider to better control costs. For example, you can run unimportant jobs on low-performance but cost-effective instances.

  • Multi-tenant environment

    A cluster is shared by multiple teams. Each team wants to manage and optimize their workloads separately. In this scenario, you can create a node pool for each team so that the teams can plan and adjust their own resources.

To schedule pods to a specific node pool, you can add node affinity rules to the configuration of the head pod and WorkerGroup pods. In addition, you can create a resource policy to prioritize resources to which pods are scheduled. For example, you can preferably schedule pods to Elastic Compute Service (ECS) instances. When ECS instances are out of stock, pods are scheduled to elastic container instances or Alibaba Cloud Container Compute Service (ACS) nodes. The following code block is an example.

View sample code

apiVersion: ray.io/v1
kind: RayJob
metadata:
  generateName: rayjob-sample-
  namespace: algorithm-text
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"
  suspend: true
  shutdownAfterJobFinishes: true
  rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          # Schedule pods only to the specified node pool.
          affinity:
            nodeAffinity:
              requiredDuringSchedulingIgnoredDuringExecution:
                nodeSelectorTerms:
                - matchExpressions:
                  - key: alibabacloud.com/nodepool-id
                    operator: In
                    values:
                    - np9c9b663eb55d44d0943009d5c3d32781
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "1"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            - name: code-sample
              configMap:
                name: ray-job-code-sample
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      - replicas: 30
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            # Schedule pods only to the specified node pool.
            affinity:
              nodeAffinity:
                requiredDuringSchedulingIgnoredDuringExecution:
                  nodeSelectorTerms:
                  - matchExpressions:
                    - key: alibabacloud.com/nodepool-id
                      operator: In
                      values:
                      - np9c9b663eb55d44d0943009d5c3d32781
            containers:
              - name: ray-worker
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "1"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
  namespace: algorithm-text
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

    import time
    time.sleep(30)

Create a resource policy to schedule jobs to serverless resources when the node pool does not have sufficient resources.

Important

Currently, gang scheduling does not support ACS resources.

apiVersion: scheduling.alibabacloud.com/v1alpha1
kind: ResourcePolicy
metadata:
  name: rayjob-sample-tnj27-raycluster-s6t8b-small-group
  namespace: algorithm-text
spec:
  selector:
    # Prioritize a WorkerGroup of a RayCluster. ray.io/group specifies the name of a WorkerGroup.
    # ray.io/cluster specifies a RayCluster.
    ray.io/group: small-group
    ray.io/cluster: rayjob-sample-tnj27-raycluster-s6t8b
    ray.io/node-type: worker
  strategy: prefer
  units:
  - nodeSelector:
      alibabacloud.com/nodepool-id: np9c9b663eb55d44d0943009d5c3d32781
    resource: ecs
  - resource: acs

Configure gang scheduling

If you want multiple servers to collaborate and reduce the complexity of programming in the following scenarios, you can use Ray to configure gang scheduling.

  • Large-scale machine learning model training

    A single server cannot provide sufficient compute resources to process large datasets or complex models. In this case, you need a group of containers to collaborate. Gang scheduling can ensure that these containers do not compete for resources, avoid dead locks, and improve the training efficiency.

  • MPI framework

    Parallel computing based on multiple threads in the MPI framework require the primary and secondary threads to collaborate. Gang scheduling can schedule these threads concurrently, reduce the communication latency, and improve the computing efficiency.

  • Big data processing and analysis

    For applications that need to process large amounts of data, such as log analysis and real-time stream processing, you may need to run multiple jobs at the same time to complete complex tasks. Gang scheduling can ensure that these jobs are scheduled concurrently to improve the overall efficiency.

  • Custom distributed application development

    You may need to build player matching in a gaming server architecture or handle data collected from a large number of IoT devices.

To submit a gang scheduling request in a RayJob, you only need to declare ray.io/scheduler-name=kube-scheduler in the metadata of the RayJob in an ACK Pro or ACK Lingjun cluster. After the job is submitted, the RayOperator will inject the gang scheduling label into pods.

The following code block is an example.

View sample code

apiVersion: ray.io/v1
kind: RayJob
metadata:
  generateName: rayjob-sample-
  namespace: algorithm-text
  labels:
    # Use ray.io/scheduler-name to specify gang scheduling.
    ray.io/scheduler-name: kube-scheduler
    # Use quota.scheduling.alibabacloud.com/name to specify a quota.
    quota.scheduling.alibabacloud.com/name: algorithm-video
spec:
  entrypoint: python /home/ray/samples/sample_code.py
  runtimeEnvYAML: |
    pip:
      - requests==2.26.0
      - pendulum==2.1.2
    env_vars:
      counter_name: "test_counter"
  shutdownAfterJobFinishes: true
  # Suspend specifies whether the RayJob controller should create a RayCluster instance.
  # If a job is applied with the suspend field set to true, the RayCluster will not be created and we will wait for the transition to false.
  # If the RayCluster is already created, it will be deleted. In the case of transition to false, a new RayCluste rwill be created.
  suspend: true

  rayClusterSpec:
    rayVersion: '2.9.0' # should match the Ray version in the image of the containers
    headGroupSpec:
      rayStartParams:
        dashboard-host: '0.0.0.0'
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray:2.9.0
              ports:
                - containerPort: 6379
                  name: gcs-server
                - containerPort: 8265 # Ray dashboard
                  name: dashboard
                - containerPort: 10001
                  name: client
              resources:
                limits:
                  cpu: "1"
                requests:
                  cpu: "1"
              volumeMounts:
                - mountPath: /home/ray/samples
                  name: code-sample
          volumes:
            # You set volumes at the Pod level, then mount them into containers inside that Pod
            - name: code-sample
              configMap:
                # Provide the name of the ConfigMap you want to mount.
                name: ray-job-code-sample
                # An array of keys from the ConfigMap to create as files
                items:
                  - key: sample_code.py
                    path: sample_code.py
    workerGroupSpecs:
      - replicas: 30
        groupName: small-group
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name',  or '123-abc'
                image: rayproject/ray:2.9.0
                lifecycle:
                  preStop:
                    exec:
                      command: [ "/bin/sh","-c","ray stop" ]
                resources:
                  limits:
                    cpu: "1"
                  requests:
                    cpu: "1"
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: ray-job-code-sample
  namespace: algorithm-text
data:
  sample_code.py: |
    import ray
    import os
    import requests

    ray.init()

    @ray.remote
    class Counter:
        def __init__(self):
            # Used to verify runtimeEnv
            self.name = os.getenv("counter_name")
            assert self.name == "test_counter"
            self.counter = 0

        def inc(self):
            self.counter += 1

        def get_counter(self):
            return "{} got {}".format(self.name, self.counter)

    counter = Counter.remote()

    for _ in range(5):
        ray.get(counter.inc.remote())
        print(ray.get(counter.get_counter.remote()))

    # Verify that the correct runtime env was used for the job.
    assert requests.__version__ == "2.26.0"

    import time
    time.sleep(30)

Add pod labels, which are used to identify, filter, and manage pods. The following code block is an example.

View sample code

apiVersion: v1
kind: Pod
metadata:
  annotations:
    ray.io/ft-enabled: "false"
  creationTimestamp: "2024-10-10T02:38:29Z"
  generateName: rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-
  labels:
    app.kubernetes.io/created-by: kuberay-operator
    app.kubernetes.io/name: kuberay
    # Add the Coscheduling label recognized by ACK.
    pod-group.scheduling.sigs.k8s.io/min-available: "31"
    pod-group.scheduling.sigs.k8s.io/name: rayjob-sample-hhbdr-raycluster-ljj69
    # Add the Quota label recognized by ACK.
    quota.scheduling.alibabacloud.com/name: algorithm-video
    ray.io/cluster: rayjob-sample-hhbdr-raycluster-ljj69
    ray.io/group: small-group
    ray.io/identifier: rayjob-sample-hhbdr-raycluster-ljj69-worker
    ray.io/is-ray-node: "yes"
    ray.io/node-type: worker
    scheduling.x-k8s.io/pod-group: rayjob-sample-hhbdr-raycluster-ljj69
  name: rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh
  namespace: algorithm-text
  ownerReferences:
  - apiVersion: ray.io/v1
    blockOwnerDeletion: true
    controller: true
    kind: RayCluster
    name: rayjob-sample-hhbdr-raycluster-ljj69
    uid: 74259f20-86fd-4777-b826-73d201065931
  resourceVersion: "7482744"
  uid: 4f666efe-a25d-4620-824f-cab3f4fa0ce7
spec:
  containers:
  - args:
    - 'ulimit -n 65536; ray start  --address=rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379  --metrics-export-port=8080  --block  --dashboard-agent-listen-port=52365  --num-cpus=1 '
    command:
    - /bin/bash
    - -lc
    - --
    env:
    - name: FQ_RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local
    - name: RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc
    - name: RAY_CLUSTER_NAME
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.labels['ray.io/cluster']
    - name: RAY_CLOUD_INSTANCE_ID
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.name
    - name: RAY_NODE_TYPE_NAME
      valueFrom:
        fieldRef:
          apiVersion: v1
          fieldPath: metadata.labels['ray.io/group']
    - name: KUBERAY_GEN_RAY_START_CMD
      value: 'ray start  --address=rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379  --metrics-export-port=8080  --block  --dashboard-agent-listen-port=52365  --num-cpus=1 '
    - name: RAY_PORT
      value: "6379"
    - name: RAY_ADDRESS
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379
    - name: RAY_USAGE_STATS_KUBERAY_IN_USE
      value: "1"
    - name: REDIS_PASSWORD
    - name: RAY_DASHBOARD_ENABLE_K8S_DISK_USAGE
      value: "1"
    image: rayproject/ray:2.9.0
    imagePullPolicy: IfNotPresent
    lifecycle:
      preStop:
        exec:
          command:
          - /bin/sh
          - -c
          - ray stop
    livenessProbe:
      exec:
        command:
        - bash
        - -c
        - wget -T 2 -q -O- http://localhost:52365/api/local_raylet_healthz | grep
          success
      failureThreshold: 120
      initialDelaySeconds: 30
      periodSeconds: 5
      successThreshold: 1
      timeoutSeconds: 2
    name: ray-worker
    ports:
    - containerPort: 8080
      name: metrics
      protocol: TCP
    readinessProbe:
      exec:
        command:
        - bash
        - -c
        - wget -T 2 -q -O- http://localhost:52365/api/local_raylet_healthz | grep
          success
      failureThreshold: 10
      initialDelaySeconds: 10
      periodSeconds: 5
      successThreshold: 1
      timeoutSeconds: 2
    resources:
      limits:
        cpu: "1"
      requests:
        cpu: "1"
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /dev/shm
      name: shared-mem
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: kube-api-access-rq67v
      readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  initContainers:
  - args:
    - "\n\t\t\t\t\tSECONDS=0\n\t\t\t\t\twhile true; do\n\t\t\t\t\t\tif (( SECONDS
      <= 120 )); then\n\t\t\t\t\t\t\tif ray health-check --address rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379
      > /dev/null 2>&1; then\n\t\t\t\t\t\t\t\techo \"GCS is ready.\"\n\t\t\t\t\t\t\t\tbreak\n\t\t\t\t\t\t\tfi\n\t\t\t\t\t\t\techo
      \"$SECONDS seconds elapsed: Waiting for GCS to be ready.\"\n\t\t\t\t\t\telse\n\t\t\t\t\t\t\tif
      ray health-check --address rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local:6379;
      then\n\t\t\t\t\t\t\t\techo \"GCS is ready. Any error messages above can be safely
      ignored.\"\n\t\t\t\t\t\t\t\tbreak\n\t\t\t\t\t\t\tfi\n\t\t\t\t\t\t\techo \"$SECONDS
      seconds elapsed: Still waiting for GCS to be ready. For troubleshooting, refer
      to the FAQ at https://github.com/ray-project/kuberay/blob/master/docs/guidance/FAQ.md.\"\n\t\t\t\t\t\tfi\n\t\t\t\t\t\tsleep
      5\n\t\t\t\t\tdone\n\t\t\t\t"
    command:
    - /bin/bash
    - -lc
    - --
    env:
    - name: FQ_RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc.algorithm-text.svc.cluster.local
    - name: RAY_IP
      value: rayjob-sample-hhbdr-raycluster-ljj69-head-svc
    image: rayproject/ray:2.9.0
    imagePullPolicy: IfNotPresent
    name: wait-gcs-ready
    resources:
      limits:
        cpu: 200m
        memory: 256Mi
      requests:
        cpu: 200m
        memory: 256Mi
    terminationMessagePath: /dev/termination-log
    terminationMessagePolicy: File
    volumeMounts:
    - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
      name: kube-api-access-rq67v
      readOnly: true
  preemptionPolicy: PreemptLowerPriority
  priority: 0
  restartPolicy: Always
  schedulerName: default-scheduler
  securityContext: {}
  serviceAccount: default
  serviceAccountName: default
  terminationGracePeriodSeconds: 30
  tolerations:
  - effect: NoExecute
    key: node.kubernetes.io/not-ready
    operator: Exists
    tolerationSeconds: 300
  - effect: NoExecute
    key: node.kubernetes.io/unreachable
    operator: Exists
    tolerationSeconds: 300
  volumes:
  - emptyDir:
      medium: Memory
    name: shared-mem
  - name: kube-api-access-rq67v
    projected:
      defaultMode: 420
      sources:
      - serviceAccountToken:
          expirationSeconds: 3607
          path: token
      - configMap:
          items:
          - key: ca.crt
            path: ca.crt
          name: kube-root-ca.crt
      - downwardAPI:
          items:
          - fieldRef:
              apiVersion: v1
              fieldPath: metadata.namespace
            path: namespace
status:
  conditions:
  - lastProbeTime: null
    lastTransitionTime: "2024-10-10T02:38:29Z"
    message: '0/7 nodes are available: 3 node(s) had untolerated taint {virtual-kubelet.io/provider:
      alibabacloud}, 4 Insufficient cpu. failed to get current scheduling unit not
      found, preemption: 0/7 nodes are available: 1 No victims found on node cn-hongkong.10.0.118.181
      for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      1 No victims found on node cn-hongkong.10.1.0.52 for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      1 No victims found on node cn-hongkong.10.2.0.24 for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      1 No victims found on node cn-hongkong.10.2.0.5 for preemptor pod rayjob-sample-hhbdr-raycluster-ljj69-small-group-worker-xnzjh,
      3 Preemption is not helpful for scheduling., '
    reason: Unschedulable
    status: "False"
    type: PodScheduled
  phase: Pending
  qosClass: Burstable

When resources are insufficient, you can query events to locate the cause of scheduling failures. Use --field-selector='type=Warning,reason=GangFailedScheduling' to find failure records related to gang scheduling. "cycle xx" indicates the cause of scheduling failures in different rounds. The following code block is an example.

➜  kubequeue-doc kubectl get events -n algorithm-text --field-selector='type=Warning,reason=GangFailedScheduling' | grep "cycle 1"
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-89mlq   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-89mlq in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8fwmr   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8fwmr in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8g5wv   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8g5wv in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8tn4w   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-8tn4w in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-97gpk   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-97gpk in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-9xsgw   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-9xsgw in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-gwxhg   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-gwxhg in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-jzw6k   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-jzw6k in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-kb55s   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-kb55s in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-lbvk7   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-lbvk7 in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-ms96b   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-ms96b in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-sgr9g   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-sgr9g in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m46s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-svt6g   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-svt6g in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.
5m48s       Warning   GangFailedScheduling   pod/rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-wm5c6   rayjob-sample-dtmtl-raycluster-r9jc7-small-group-worker-wm5c6 in gang failed to be scheduled in cycle 1: 0/0 nodes are available: 3 Insufficient cpu.