All Products
Search
Document Center

Container Service for Kubernetes:Use Celeborn to enable RSS for Spark jobs

Last Updated:Dec 20, 2024

Apache Celeborn is used to process intermediate data, such as shuffle data and spilled data, for big data compute engines. Celeborn can efficiently improve the performance, stability, and flexibility of big data compute engines. Remote Shuffle Service (RSS) provides an efficient method to shuffle a large number of datasets. This topic describes how to deploy Celeborn in a Container Service for Kubernetes (ACK) cluster and how to use Celeborn to enable RSS for a Spark job.

Benefits

Using Celeborn to enable RSS has the following benefits for big data processing based on frameworks such as MapReduce, Spark, and Flink:

  • Push-based shuffle write: Mappers do not need to store data on local disks. This feature is suitable for services that use the compute-storage separation architecture in the cloud.

  • Merge-based shuffle write: Data is merged on workers instead of reducers. This eliminates the network overheads incurred by random reads and writes on small files and small data transmission volumes, which improves data processing efficiency.

  • High availability: Masters implements high availability and stability for the system based on the Raft protocol.

  • High fault tolerance: Celeborn supports two replicas to greatly reduce the probability of fetch failures.

Prerequisites

Cluster environment

Configure the cluster environment based on the following configurations:

  • Deploy master processes in a node pool named celeborn-master. The node pool uses the following configurations:

    • Node pool name: celeborn-master.

    • Node quantity: 3.

    • Elastic Compute Service (ECS) instance types: g8i.2xlarge.

    • Labels: celeborn.apache.org/role=master

    • Taints: celeborn.apache.org/role=master:NoSchedule

    • Data storage for each master: /mnt/celeborn_ratis (1024GB).

  • Deploy worker processes in a node pool named celeborn-worker. The node pool uses the following configurations:

    • Node pool name: celeborn-worker.

    • Node quantity: 5.

    • ECS instance types: g8i.4xlarge.

    • Labels: celeborn.apache.org/role=worker.

    • Taints: celeborn.apache.org/role=worker:NoSchedule.

    • Data storage for each worker:

      • /mnt/disk1 (1024GB)

      • /mnt/disk2 (1024GB)

      • /mnt/disk3 (1024GB)

      • /mnt/disk4 (1024GB)

Procedure overview

This section describes the main steps for deploying Celeborn in an ACK cluster.

  1. Build a Celeborn container image

    Download the package of a Celeborn release version based on your business requirements. Then, build a container image based on the package and push the image to your image repository in Container Registry. This image is used to deploy the ack-celeborn component.

  2. Deploy the ack-celeborn component

    Install a Helm chart named ack-celeborn from the Marketplace page of the ACK console to deploy Celeborn by using the image you built. After you deploy Celeborn, a Celeborn cluster is deployed.

  3. Build a Spark container image

    Build a Spark container image that contains Celeborn and the JAR packages required for accessing Object Storage Service (OSS) and push the image to your image repository in Container Registry.

  4. Prepare and upload test data to OSS

    Generate test datasets for a PageRank job and upload the datasets to OSS.

  5. Run a sample Spark job

    Run a sample PageRank job and use Celeborn to enable RSS.

  6. (Optional) Clear the environment

    After you perform all the steps in this topic, delete the Spark job and release relevant resources if they are no longer required.

Step 1: Build a Celeborn container image

Download the package of a Celeborn release version, such as 0.5.2, based on your business requirements, from the Celeborn official website. When you build the image, replace <IMAGE-REGISTRY> and <IMAGE-REPOSITORY> with the name of your image repository and the image name that you want to use. In addition, you can modify the PLATFORMS variable to specify the architecture of the image. For more information, see Deploy Celeborn on Kubernetes. The docker buildx command is supported in Docker 19.03 or later. For more information about how to update the Docker version, see Install Docker.

CELEBORN_VERSION=0.5.2               # The Celeborn release version. 

IMAGE_REGISTRY=<IMAGE-REGISTRY>      # The image repository name, such as docker.io. 

IMAGE_REPOSITORY=<IMAGE-REPOSITORY>  # The image name, such as apache/celeborn. 

IMAGE_TAG=${CELEBORN_VERSION}        # The image tag. In this example, a label that indicates the Celeborn release version is added. 

# Download the package. 
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# Decompress the package. 
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# Switch the working directory. 
cd apache-celeborn-${CELEBORN_VERSION}-bin

# Use Docker Buildkit to build the image and push the image to your image repository. 
docker buildx build \
    --output=type=registry \
    --push \
    --platform=${PLATFORMS} \
    --tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
    -f docker/Dockerfile \
    .

Step 2: Deploy the ack-celeborn component

  1. Log on to the ACK console. In the left-side navigation pane, choose Marketplace > Marketplace.

  2. On the Marketplace page, click the App Catalog tab. Find and click ack-celeborn. On the ack-celeborn page, click Deploy.

  3. In the Deploy panel, select a cluster and namespace, and then click Next.

  1. In the Parameters step, configure the parameters and click OK.

    image:                         # Replace the value with the address of the Celeborn image you built in Step 1. 
      registry: docker.io          # The image repository name. 
      repository: apache/celeborn  # The image name. 
      tag: 0.5.2                   # The image tag. 
    
    celeborn:
      celeborn.client.push.stageEnd.timeout: 120s
      celeborn.master.ha.enabled: true
      celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis
      celeborn.master.heartbeat.application.timeout: 300s
      celeborn.master.heartbeat.worker.timeout: 120s
      celeborn.master.http.port: 9098
      celeborn.metrics.enabled: true
      celeborn.metrics.prometheus.path: /metrics/prometheus
      celeborn.rpc.dispatcher.numThreads: 4
      celeborn.rpc.io.clientThreads: 64
      celeborn.rpc.io.numConnectionsPerPeer: 2
      celeborn.rpc.io.serverThreads: 64
      celeborn.shuffle.chunk.size: 8m
      celeborn.worker.fetch.io.threads: 32
      celeborn.worker.flusher.buffer.size: 256K
      celeborn.worker.http.port: 9096
      celeborn.worker.monitor.disk.enabled: false
      celeborn.worker.push.io.threads: 32
      celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi
    
    master:
      replicas: 3
      env:
      - name: CELEBORN_MASTER_MEMORY
        value: 28g
      - name: CELEBORN_MASTER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: celeborn-ratis
        mountPath: /mnt/celeborn_ratis
      resources:
        requests:
          cpu: 7                
          memory: 28Gi        
        limits:
          cpu: 7
          memory: 28Gi
      volumes:
      - name: celeborn-ratis
        hostPath:
          path: /mnt/celeborn_ratis
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: master
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: master
        effect: NoSchedule
    
    worker:
      replicas: 5
      env:
      - name: CELEBORN_WORKER_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_OFFHEAP_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: disk1
        mountPath: /mnt/disk1
      - name: disk2
        mountPath: /mnt/disk2
      - name: disk3
        mountPath: /mnt/disk3
      - name: disk4
        mountPath: /mnt/disk4
      resources:
        requests:
          cpu: 14
          memory: 56Gi
        limits:
          cpu: 14
          memory: 56Gi
      volumes:
      - name: disk1
        hostPath:
          path: /mnt/disk1
          type: DirectoryOrCreate
      - name: disk2
        hostPath:
          path: /mnt/disk2
          type: DirectoryOrCreate
      - name: disk3
        hostPath:
          path: /mnt/disk3
          type: DirectoryOrCreate
      - name: disk4
        hostPath:
          path: /mnt/disk4
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: worker
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: worker
        effect: NoSchedule

    The following table describes some parameters. You can find the parameter configurations in the Parameters section on the ack-celeborn page.

    Parameters

    Parameter

    Description

    Example

    image.registry

    The address of the image repository.

    "docker.io"

    image.repository

    The image name.

    "apache/celeborn"

    image.tag

    The image tag.

    "0.5.1"

    image.pullPolicy

    The image pulling policy.

    "IfNotPresent"

    celeborn

    The Celeborn configurations.

    {
      "celeborn.client.push.stageEnd.timeout": "120s",
      "celeborn.master.ha.enabled": true,
      "celeborn.master.ha.ratis.raft.server.storage.dir": "/mnt/celeborn_ratis",
      "celeborn.master.heartbeat.application.timeout": "300s",
      "celeborn.master.heartbeat.worker.timeout": "120s",
      "celeborn.master.http.port": 9098,
      "celeborn.metrics.enabled": true,
      "celeborn.metrics.prometheus.path": "/metrics/prometheus",
      "celeborn.rpc.dispatcher.numThreads": 4,
      "celeborn.rpc.io.clientThreads": 64,
      "celeborn.rpc.io.numConnectionsPerPeer": 2,
      "celeborn.rpc.io.serverThreads": 64,
      "celeborn.shuffle.chunk.size": "8m",
      "celeborn.worker.fetch.io.threads": 32,
      "celeborn.worker.flusher.buffer.size": "256K",
      "celeborn.worker.http.port": 9096,
      "celeborn.worker.monitor.disk.enabled": false,
      "celeborn.worker.push.io.threads": 32,
      "celeborn.worker.storage.dirs": "/mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi"
    }

    master.replicas

    The number of master pods.

    3

    master.volumeMounts

    Specifies where the declared volumes are mounted to the master pods in a container.

    [
      {
        "mountPath": "/mnt/celeborn_ratis",
        "name": "celeborn-ratis"
      }
    ]

    master.volumes

    The declaration of the volumes mounted to master pods.

    Only hostPath and emptyDir volumes can be mounted to master pods.

    [
      {
        "hostPath": {
          "path": "/mnt/celeborn_ratis",
          "type": "DirectoryOrCreate"
        },
        "name": "celeborn-ratis"
      }
    ]

    master.nodeSelector

    The node selectors of master pods.

    {}

    master.affinity

    The affinity rules of master pods.

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "master"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    master.tolerations

    The tolerations of master pods.

    []

    worker.replicas

    The number of worker pods.

    5

    worker.volumeMounts

    Specifies where the declared volumes are mounted to the worker pods in a container.

    [
      {
        "mountPath": "/mnt/disk1",
        "name": "disk1"
      },
      {
        "mountPath": "/mnt/disk2",
        "name": "disk2"
      },
      {
        "mountPath": "/mnt/disk3",
        "name": "disk3"
      },
      {
        "mountPath": "/mnt/disk4",
        "name": "disk4"
      }
    ]

    worker.volumes

    The declaration of the volumes mounted to worker pods.

    Only hostPath and emptyDir volumes can be mounted to worker pods.

    [
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk1",
        "mountPath": "/mnt/disk1",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk2",
        "mountPath": "/mnt/disk2",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk3",
        "mountPath": "/mnt/disk3",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk4",
        "mountPath": "/mnt/disk4",
        "type": "hostPath"
      }
    ]

    worker.nodeSelector

    The node selectors of worker pods.

    {}

    worker.affinity

    The affinity rules of worker pods.

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "worker"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    worker.tolerations

    The tolerations of worker pods.

    []

  2. Run the following to deploy Celeborn. If issues occur during the deployment process, refer to Pod troubleshooting to resolve the issues.

    kubectl get -n celeborn statefulset 

    Expected output:

    NAME              READY   AGE
    celeborn-master   3/3     68s
    celeborn-worker   5/5     68s

Step 3: Build a Spark container image

In this example, Spark 3.5.3 is used. Create a Dockerfile based on the following content. Then, use the Dockerfile to build an image and push the image to your image repository.

ARG SPARK_IMAGE=<SPARK_IMAGE>  # Replace <SPARK_IMAGE> with your Spark base image. 

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars

Step 4: Prepare and upload test data to OSS

For more information about how to prepare and upload test data to OSS, see Step 1: Prepare and upload test data to an OSS bucket.

Step 5: Create a Secret to store the credentials used to access OSS

For more information about how to create a Secret to store the credentials used to access OSS, see Step 3: Create a Secret to store OSS access credentials.

Step 6: Submit a sample Spark job

Create a file named spark-pagerank.yaml and copy the following content to the file. The file is used to create a SparkApplication. Replace <SPARK_IMAGE> with the address of the image repository you specified in Step 3: Build a Spark container image. Replace <OSS_BUCKET> and <OSS_ENDPOINT> with the name of the OSS bucket you use and the endpoint of the bucket. For more information about how to configure Celeborn in a Spark job, see Celeborn documentation.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: <SPARK_IMAGE>                                     # The Spark image. Replace <SPARK_IMAGE> with the name of the Spark image.
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # The test dataset. Replace <OSS_BUCKET> with the name of the OSS bucket you use. 
  - "10"                                                   # The number of iterations. 
  sparkVersion: 3.5.3
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                        # The endpoint of the OSS bucket. The internal endpoint for the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com.  
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  sparkConf:
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    spark.celeborn.client.spark.shuffle.writer: hash
    spark.celeborn.client.push.replicate.enabled: "false"
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    spark.executor.userClassPathFirst: "false"
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret
  restartPolicy:
    type: Never

(Optional) Step 7: Clear the environment

After you perform all the steps in this topic, run the following command to delete the Spark job and release relevant resources if they are no longer required.

Run the following command to delete the Spark job:

kubectl delete sparkapplication spark-pagerank

Run the following command to delete the Secret:

kubectl delete secret spark-oss-secret

References