Apache Celeborn is used to process intermediate data, such as shuffle data and spilled data, for big data compute engines. Celeborn can efficiently improve the performance, stability, and flexibility of big data compute engines. Remote Shuffle Service (RSS) provides an efficient method to shuffle a large number of datasets. This topic describes how to deploy Celeborn in a Container Service for Kubernetes (ACK) cluster and how to use Celeborn to enable RSS for a Spark job.
Benefits
Using Celeborn to enable RSS has the following benefits for big data processing based on frameworks such as MapReduce, Spark, and Flink:
Push-based shuffle write: Mappers do not need to store data on local disks. This feature is suitable for services that use the compute-storage separation architecture in the cloud.
Merge-based shuffle write: Data is merged on workers instead of reducers. This eliminates the network overheads incurred by random reads and writes on small files and small data transmission volumes, which improves data processing efficiency.
High availability: Masters implements high availability and stability for the system based on the Raft protocol.
High fault tolerance: Celeborn supports two replicas to greatly reduce the probability of fetch failures.
Prerequisites
The ack-spark-operator component is installed. For more information, see Step 1: Install the ack-spark-operator component.
A kubectl client is connected to the ACK cluster. For more information, see Obtain the kubeconfig file of a cluster and use kubectl to connect to the cluster.
An OSS bucket is created. For more information, see Create a bucket.
The ossutil is installed and configured. For more information, see Install ossutil and Configure ossutil.
For more information about the ossutil command, see ossutil command reference.
Node pools are created based on the configurations described in the following section. For more information about how to create a node pool, see Create a node pool.
Cluster environment
Configure the cluster environment based on the following configurations:
Deploy master processes in a node pool named celeborn-master. The node pool uses the following configurations:
Node pool name: celeborn-master.
Node quantity: 3.
Elastic Compute Service (ECS) instance types: g8i.2xlarge.
Labels: celeborn.apache.org/role=master
Taints: celeborn.apache.org/role=master:NoSchedule
Data storage for each master: /mnt/celeborn_ratis (1024GB).
Deploy worker processes in a node pool named celeborn-worker. The node pool uses the following configurations:
Node pool name: celeborn-worker.
Node quantity: 5.
ECS instance types: g8i.4xlarge.
Labels: celeborn.apache.org/role=worker.
Taints: celeborn.apache.org/role=worker:NoSchedule.
Data storage for each worker:
/mnt/disk1 (1024GB)
/mnt/disk2 (1024GB)
/mnt/disk3 (1024GB)
/mnt/disk4 (1024GB)
Procedure overview
This section describes the main steps for deploying Celeborn in an ACK cluster.
Build a Celeborn container image
Download the package of a Celeborn release version based on your business requirements. Then, build a container image based on the package and push the image to your image repository in Container Registry. This image is used to deploy the ack-celeborn component.
Deploy the ack-celeborn component
Install a Helm chart named ack-celeborn from the Marketplace page of the ACK console to deploy Celeborn by using the image you built. After you deploy Celeborn, a Celeborn cluster is deployed.
Build a Spark container image
Build a Spark container image that contains Celeborn and the JAR packages required for accessing Object Storage Service (OSS) and push the image to your image repository in Container Registry.
Prepare and upload test data to OSS
Generate test datasets for a PageRank job and upload the datasets to OSS.
Run a sample Spark job
Run a sample PageRank job and use Celeborn to enable RSS.
(Optional) Clear the environment
After you perform all the steps in this topic, delete the Spark job and release relevant resources if they are no longer required.
Step 1: Build a Celeborn container image
Download the package of a Celeborn release version, such as 0.5.2, based on your business requirements, from the Celeborn official website. When you build the image, replace <IMAGE-REGISTRY> and <IMAGE-REPOSITORY> with the name of your image repository and the image name that you want to use. In addition, you can modify the PLATFORMS variable to specify the architecture of the image. For more information, see Deploy Celeborn on Kubernetes. The docker buildx command is supported in Docker 19.03 or later. For more information about how to update the Docker version, see Install Docker.
CELEBORN_VERSION=0.5.2 # The Celeborn release version.
IMAGE_REGISTRY=<IMAGE-REGISTRY> # The image repository name, such as docker.io.
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # The image name, such as apache/celeborn.
IMAGE_TAG=${CELEBORN_VERSION} # The image tag. In this example, a label that indicates the Celeborn release version is added.
# Download the package.
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Decompress the package.
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Switch the working directory.
cd apache-celeborn-${CELEBORN_VERSION}-bin
# Use Docker Buildkit to build the image and push the image to your image repository.
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.Step 2: Deploy the ack-celeborn component
Log on to the ACK console. In the left-side navigation pane, choose .
On the Marketplace page, click the App Catalog tab. Find and click ack-celeborn. On the ack-celeborn page, click Deploy.
In the Deploy panel, select a cluster and namespace, and then click Next.
In the Parameters step, configure the parameters and click OK.
image: # Replace the value with the address of the Celeborn image you built in Step 1. registry: docker.io # The image repository name. repository: apache/celeborn # The image name. tag: 0.5.2 # The image tag. celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoScheduleThe following table describes some parameters. You can find the parameter configurations in the Parameters section on the ack-celeborn page.
Run the following to deploy Celeborn. If issues occur during the deployment process, refer to Pod troubleshooting to resolve the issues.
kubectl get -n celeborn statefulsetExpected output:
NAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
Step 3: Build a Spark container image
In this example, Spark 3.5.3 is used. Create a Dockerfile based on the following content. Then, use the Dockerfile to build an image and push the image to your image repository.
ARG SPARK_IMAGE=<SPARK_IMAGE> # Replace <SPARK_IMAGE> with your Spark base image.
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jarsStep 4: Prepare and upload test data to OSS
For more information about how to prepare and upload test data to OSS, see Step 1: Prepare and upload test data to an OSS bucket.
Step 5: Create a Secret to store the credentials used to access OSS
For more information about how to create a Secret to store the credentials used to access OSS, see Step 3: Create a Secret to store OSS access credentials.
Step 6: Submit a sample Spark job
Create a file named spark-pagerank.yaml and copy the following content to the file. The file is used to create a SparkApplication. Replace <SPARK_IMAGE> with the address of the image repository you specified in Step 3: Build a Spark container image. Replace <OSS_BUCKET> and <OSS_ENDPOINT> with the name of the OSS bucket you use and the endpoint of the bucket. For more information about how to configure Celeborn in a Spark job, see Celeborn documentation.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # The Spark image. Replace <SPARK_IMAGE> with the name of the Spark image.
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # The test dataset. Replace <OSS_BUCKET> with the name of the OSS bucket you use.
- "10" # The number of iterations.
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # The endpoint of the OSS bucket. The internal endpoint for the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com.
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never(Optional) Step 7: Clear the environment
After you perform all the steps in this topic, run the following command to delete the Spark job and release relevant resources if they are no longer required.
Run the following command to delete the Spark job:
kubectl delete sparkapplication spark-pagerankRun the following command to delete the Secret:
kubectl delete secret spark-oss-secretReferences
For more information about how to use Spark Operator to submit Spark jobs, see Use Spark Operator to run Spark jobs.
For more information about how to use Spark History Server to view information about Spark jobs, see Use Spark History Server to view information about Spark jobs.
For more information about how to use Celeborn, see Apache Celeborn documentation.