Apache Celeborn is a Remote Shuffle Service (RSS) that manages intermediate data, such as shuffle and spill data, for big data compute engines. This topic describes how to deploy Celeborn in an Alibaba Cloud Container Service for Kubernetes (ACK) cluster, and use it as the RSS for Spark jobs.
Benefits
Celeborn provides the following benefits for big data workloads based on MapReduce, Spark, and Flink:
Push-based shuffle: Mappers do not need to store data on local disks. This is well suited for cloud-native architectures that separate storage from compute.
Merge-based shuffle: Data is merged on workers instead of reducers, which eliminates network overhead from random small-file I/O and small data transfers.
High availability: Masters use the Raft consensus protocol for high availability.
Fault tolerance: Dual replicas significantly reduce the probability of fetch failures.
Prerequisites
Before you begin, make sure that you have completed the following tasks:
Installed, configured ossutil. For more information, see ossutil command reference.
Created node pools based on the specifications in the Cluster environment section.
Cluster environment
Create two node pools with the following configurations.
celeborn-master node pool
Parameter | Value |
Node pool name | celeborn-master |
Node quantity | 3 |
Elastic Compute Service (ECS) instance type | g8i.2xlarge |
Labels |
|
Taints |
|
Data storage per master |
|
celeborn-worker node pool
Parameter | Value |
Node pool name | celeborn-worker |
Node quantity | 5 |
ECS instance type | g8i.4xlarge |
Labels |
|
Taints |
|
Data storage per worker |
|
Procedure overview
Step | Summary |
Download a Celeborn release and build a container image. Push the image to your Container Registry repository. | |
Install the ack-celeborn Helm chart from the ACK console Marketplace to deploy a Celeborn cluster. | |
Build a Spark image that includes the Celeborn client and OSS dependency JARs. Push the image to your Container Registry repository. | |
Generate a PageRank test dataset and upload it to OSS. | |
Create a Kubernetes Secret that stores the credentials for accessing OSS. | |
Run a PageRank job that uses Celeborn as its shuffle service. | |
Delete the Spark job and release resources. |
Step 1: Build a Celeborn container image
Download a Celeborn release package from the official Celeborn website, build a container image, and push it to your Container Registry repository. See Deploy Celeborn on Kubernetes.
The docker buildx command requires Docker 19.03 or later. For more information, see Install Docker.Replace <IMAGE-REGISTRY> and <IMAGE-REPOSITORY> with your Container Registry address and image name. Modify the PLATFORMS variable to specify the target architecture.
CELEBORN_VERSION=0.5.2 # The Celeborn version.
IMAGE_REGISTRY=<IMAGE-REGISTRY> # The Container Registry address, such as docker.io.
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # The image name, such as apache/celeborn.
IMAGE_TAG=${CELEBORN_VERSION} # The image tag. Uses the Celeborn version by default.
# Download the distribution package.
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Extract the package.
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Go to the working directory.
cd apache-celeborn-${CELEBORN_VERSION}-bin
# Build and push the image to Container Registry.
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.Step 2: Deploy the ack-celeborn component
Log on to the ACK console. In the left navigation pane, click .
On the Marketplace page, click the App Catalog tab. Find and click ack-celeborn. On the ack-celeborn page, click Deploy.
In the Deploy panel, select a cluster and namespace, keep the default release name, then click Next.
In the Parameters step, configure the parameters and click OK. The following YAML shows a sample configuration. Replace the image address with the image you built in Step 1. The following table describes key parameters. For a full list, see the Parameters section on the ack-celeborn page.
image: # Replace with the Celeborn image from Step 1. registry: docker.io # The Container Registry address. repository: apache/celeborn # The image name. tag: 0.5.2 # The image tag. celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoScheduleVerify that the Celeborn cluster is running. If pods fail to start, see Pod troubleshooting.
kubectl get -n celeborn statefulsetExpected output:
NAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
Step 3: Build a Spark container image
Build a Spark image that includes the Celeborn client JAR and the JARs required for accessing OSS. Then push the image to your Container Registry repository.
This example uses Spark 3.5.3. Create a Dockerfile with the following content and replace <SPARK_IMAGE> with your Spark base image.
ARG SPARK_IMAGE=<SPARK_IMAGE> # Replace <SPARK_IMAGE> with your Spark base image.
FROM ${SPARK_IMAGE}
# Add dependencies for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Add the Celeborn client dependency
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jarsThe Celeborn client JAR version (0.5.1) differs from the Celeborn server version (0.5.2) deployed in Step 2. The 0.5.1 client is compatible with the 0.5.2 server. Use the client JAR version specified above.
Step 4: Prepare and upload test data to OSS
Generate a PageRank test dataset and upload it to your OSS bucket. For detailed instructions, see Prepare and upload test data to an OSS bucket.
Step 5: Create a Secret to store OSS access credentials
Create a Kubernetes Secret to store the credentials for accessing OSS. For detailed instructions, see Create a Secret to store OSS access credentials.
Step 6: Submit a sample Spark job
Create a file named spark-pagerank.yaml with the following content.
Replace the following placeholders:
Placeholder | Description | Example |
| The Spark image address from Step 3. |
|
| The name of your OSS bucket. |
|
| The endpoint of your OSS bucket. |
|
For more information about Celeborn Spark configuration, see Celeborn documentation.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # The Spark image.
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # The test dataset.
- "10" # The number of iterations.
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # The OSS endpoint.
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never(Optional) Step 7: Clean up the environment
After you complete all steps, delete the Spark job and related resources if they are no longer needed.
Delete the Spark job:
kubectl delete sparkapplication spark-pagerankDelete the Secret:
kubectl delete secret spark-oss-secret