Apache Celeborn is a service designed to manage intermediate data, such as shuffle data and spill data, for big data computing engines. As a remote shuffle service (RSS), it efficiently handles the shuffle process for large-scale datasets, improving the performance, stability, and flexibility of these engines. This topic describes how to deploy the Celeborn component in a Container Service for Kubernetes (ACK) cluster and use Celeborn as the RSS for Spark jobs.
Benefits
For big data processing frameworks such as MapReduce, Spark, and Flink, using Celeborn as an RSS provides the following benefits:
-
Push-based shuffle write: Mapper nodes do not need to store data on local disks, making it ideal for cloud-native architectures with
storage and compute separation. -
Merge-based shuffle read: Data is merged on
workernodes instead of reducer nodes. This avoids random small-file I/O and the network overhead from small data transfers, improving data processing efficiency. -
high availability: The Celebornmasternodes use the Raft consensus protocol to achievehigh availabilityand ensure system stability. -
fault tolerance: Celeborn supportsdual replicas, which significantly reduces the probability of fetch failures.
Prerequisites
The ack-spark-operator component is installed. For more information, see Step 1: Install ack-spark-operator.
A kubectl client is connected to the ACK cluster. For more information, see Connect to an ACK cluster using kubectl.
You have created an
Object Storage Service (OSS)bucket. For more information, see Create a bucket.You have installed and configured ossutil. For more information about ossutil commands, see Get started with ossutil.
-
You have created and configured
node poolsbased on the cluster environment described in the following section. For more information, see Create and manage a node pool.
Cluster environment
This example uses the following ACK cluster configuration:
-
The
masterprocess is deployed to the celeborn-masternode poolwith the following configuration:-
node poolname: celeborn-master -
Number of nodes: 3
-
ECS instance type: g8i.2xlarge -
label: celeborn.apache.org/role=master -
taint: celeborn.apache.org/role=master:NoSchedule -
Data storage per node: /mnt/celeborn_ratis (1024 GB)
-
-
The
workerprocess is deployed to the celeborn-workernode poolwith the following configuration:-
node poolname: celeborn-worker -
Number of nodes: 5
-
ECS instance type: g8i.4xlarge -
label: celeborn.apache.org/role=worker -
taint: celeborn.apache.org/role=worker:NoSchedule -
Data storage per node:
-
/mnt/disk1 (1024 GB)
-
/mnt/disk2 (1024 GB)
-
/mnt/disk3 (1024 GB)
-
/mnt/disk4 (1024 GB)
-
-
Procedure overview
This guide walks you through the steps to deploy Celeborn in an ACK cluster.
-
Build a Celeborn
container imageDownload a Celeborn
release, build acontainer image, and push it to yourimage repositoryto deploy the ack-celeborn component. -
Deploy the ack-celeborn component
Use the ack-celeborn
Helm chartfrom theACKMarketplaceto deploy a Celeborn cluster using thecontainer imageyou built. -
Build a
Sparkcontainer imageBuild a
Sparkcontainer imagethat includes the dependencies for Celeborn and for accessingOSS, and push the image to yourimage repository. -
Prepare and upload test data to
OSSGenerate a test dataset for the PageRank job and upload it to
OSS. -
Run a sample
SparkjobRun a sample PageRank job and configure it to use Celeborn as the RSS.
-
(Optional) Clean up resources
After you complete the tutorial, clean up the
Sparkjob and other unneeded resources to avoid unnecessary fees.
Step 1: Build a Celeborn container image
Download the required release (for example, version 0.5.2) from the official Celeborn website. During configuration, replace <IMAGE-REGISTRY> and <IMAGE-REPOSITORY> with your image registry and image name. You can also modify the PLATFORMS variable to configure the required image architecture. For more information, see Deploy Celeborn on Kubernetes. The docker buildx command requires Docker 19.03 or later. For more information about how to upgrade Docker, see Install and use Docker and Docker Compose.
CELEBORN_VERSION=0.5.2 # The Celeborn version.
IMAGE_REGISTRY=<IMAGE-REGISTRY> # The image registry, for example, docker.io.
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # The image name, for example, apache/celeborn.
IMAGE_TAG=${CELEBORN_VERSION} # The image tag. This example uses the Celeborn version as the tag.
PLATFORMS=linux/amd64 # The image platform architecture. To support multiple platforms, separate them with commas, for example, linux/amd64,linux/arm64.
# Download the release package.
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Extract the package.
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Go to the working directory.
cd apache-celeborn-${CELEBORN_VERSION}-bin
# Use Docker Buildx to build the image and push it to the image repository.
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.
Step 2: Deploy the ack-celeborn component
Log on to the ACK console. In the left navigation pane, click .
-
On the Marketplace page, click the App Catalog tab, find and select ack-celeborn, and on the ack-celeborn page, click Deploy.
-
On the Create panel, select a Create and a Create, and click Next.
-
On the Parameters page, configure the parameters and click OK.
image: # Replace this with the address of the Celeborn image that you built in Step 1. registry: docker.io # The image registry. repository: apache/celeborn # The image name. tag: 0.5.2 # The image tag. celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoScheduleThe following table describes key parameters. For a complete list of parameters, see the ConfigMaps section on the ack-celeborn page.
-
Run the following command and wait for the Celeborn deployment to complete. If you encounter pod issues during component deployment, see Pod troubleshooting.
kubectl get -n celeborn statefulsetExpected output:
NAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
Step 3: Build a Spark container image
This example uses Spark 3.5.3. Create a Dockerfile with the following content to build a container image and push it to your image repository.
ARG SPARK_IMAGE=<SPARK_IMAGE> # Replace <SPARK_IMAGE> with your Spark base image.
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars
Step 4: Upload test data to OSS
For information about how to prepare test data and upload it to OSS, see Step 1: Prepare test data and upload it to OSS.
Step 5: Create an OSS Secret
For information about how to create a Secret to store OSS access credentials, see Step 3: Create a Secret to store OSS access credentials.
Step 6: Submit a sample Spark job
Create a SparkApplication manifest file named spark-pagerank.yaml with the following content. Replace <SPARK_IMAGE> with the name of the image you built in Step 3: Build a Spark container image, and replace <OSS_BUCKET> and <OSS_ENDPOINT> with your OSS bucket and endpoint. For more information about how to configure Celeborn in a Spark job, see the Celeborn documentation.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # The Spark image. Replace <SPARK_IMAGE> with your Spark image name.
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # The input test dataset. Replace <OSS_BUCKET> with your OSS bucket name.
- "10" # The number of iterations.
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # The OSS endpoint. For example, the internal endpoint for OSS in the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com.
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never
(Optional) Step 7: Clean up resources
After you complete this tutorial, run the following commands to delete the resources and avoid unnecessary fees.
Run the following command to delete the Spark job:
kubectl delete sparkapplication spark-pagerank
Run the following command to delete the Secret resource:
kubectl delete secret spark-oss-secret
References
-
For information about how to use Spark Operator to submit
Sparkjobs, see Use Spark Operator to run Spark jobs. -
For information about how to use Spark History Server to view
Sparkjob information, see Use Spark History Server to view information about Spark jobs. -
For information about how to use Celeborn, see the Apache Celeborn documentation.