This topic demonstrates how to run Spark jobs in a Container Service for Kubernetes (ACK) cluster and configure these jobs to read and write data from Object Storage Service (OSS) buckets using the built-in PageRank job as an example.
Prerequisites
An ACK Pro cluster or ACK Serverless Pro cluster that runs Kubernetes 1.24 or later is created. For more information, see Create an ACK managed cluster, Create an ACK Serverless cluster, and Manually upgrade ACK clusters.
The ack-spark-operator component is installed. For more information, see Step 1: Install the ack-spark-operator component.
A kubectl client is connected to the ACK cluster. For more information, see Obtain the kubeconfig file of a cluster and use kubectl to connect to the cluster.
An OSS bucket is created. For more information, see Create a bucket.
The ossutil is installed and configured. For more information about the ossutil command, see ossutil command reference.
Procedure overview
This topic guides you through the following steps to run Spark jobs in an ACK cluster and configure read and write operations for OSS data:
Prepare and upload test data to an OSS bucket: Generate a test dataset for the PageRank job and upload it to an OSS bucket.
Build a Spark container image: Include the necessary JAR dependencies for OSS access in the Spark container image.
Create a Secret to store OSS access credentials: To protect data from unauthorized access, create a Secret YAML file to store credentials used by the Spark job to access OSS.
Submit a Spark job: Create and submit the configuration file of a Spark job to run an OSS data processing task.
(Optional) Environment cleanup: Delete the Spark job and resources that are completed or no longer required, to reduce costs.
Step 1: Prepare and upload test data to an OSS bucket
Generate a test dataset for PageRank and upload it to the designated OSS bucket.
Use the following YAML template to create a file named
generate_pagerank_dataset.sh. This file will be used to create the test dataset:#!/bin/bash # Check the number of arguments if [ "$#" -ne 2 ]; then echo "Usage: $0 M N" echo "M: Number of web pages" echo "N: Number of records to generate" exit 1 fi M=$1 N=$2 # Verify if M and N are positive integers if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then echo "Both M and N must be positive integers." exit 1 fi # Generate dataset for ((i=1; i<=$N; i++)); do # Ensure the source and target pages are different while true; do src=$((RANDOM % M + 1)) dst=$((RANDOM % M + 1)) if [ "$src" -ne "$dst" ]; then echo "$src $dst" break fi done doneRun the following commands to create the test dataset:
M=100000 # The number of web pages N=10000000 # The number of records # Generate dataset randomly and save as pagerank_dataset.txt bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txtRun the following command to upload the generated dataset to the
data/directory in your OSS bucket:ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/
Step 2: Build a Spark container image
Build a container image with the necessary JAR dependencies for OSS access. You can choose Hadoop OSS SDK, Hadoop S3 SDK, or JindoSDK. For more information about how to build images with the Container Registry, see Use a Container Registry Enterprise Edition instance to build an image.
The Spark image used in the sample Dockerfile comes from the open source community. You can replace it with your own Spark image as needed.
Choose the appropriate Hadoop OSS SDK, Hadoop S3 SDK, or JindoSDK version based on your Spark version.
Use Hadoop OSS SDK
In this example, Spark 3.5.5 and Hadoop OSS SDK 3.3.4 are used. Create a Dockerfile with the following sample code:
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependencies for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jarsUse Hadoop S3 SDK
In this example, Spark 3.5.5 and Hadoop S3 SDK 3.3.4 are used. Create a Dockerfile with the following sample code:
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependencies for Hadoop AWS S3 support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar ${SPARK_HOME}/jarsUse JindoSDK
In this example, Spark 3.5.5 and JindoSDK 6.8.0 are used. Create a Dockerfile with the following sample code:
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependencies for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.8.0/jindo-core-6.8.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.8.0/jindo-sdk-6.8.0.jar ${SPARK_HOME}/jarsStep 3: Create a secret to store OSS access credentials
For secure OSS data access in Spark jobs, configure OSS access credentials and store them in a Kubernetes Secret to avoid hard coding them into your jobs. Store the Secret as environment variables in the container.
Use Hadoop OSS SDK
Create a Secret YAML file named
spark-oss-secret.yamlto store credentials used to access OSS:apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # Replace <ACCESS_KEY_ID> with the AccessKey ID of your Alibaba Cloud account. OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # Replace <ACCESS_KEY_SECRET> with the AccessKey Secret of your Alibaba Cloud account. OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>Run the following command to create a Secret:
kubectl apply -f spark-oss-secret.yamlExpected output:
secret/spark-oss-secret created
Use Hadoop S3 SDK
Create a Secret YAML file named
spark-s3-secret.yamlto store credentials used to access OSS:apiVersion: v1 kind: Secret metadata: name: spark-s3-secret namespace: default stringData: # Replace <ACCESS_KEY_ID> with the AccessKey ID of your Alibaba Cloud account. AWS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # Replace <ACCESS_KEY_SECRET> with the AccessKey Secret of your Alibaba Cloud account. AWS_SECRET_ACCESS_KEY: <ACCESS_KEY_SECRET>Run the following command to create a Secret:
kubectl apply -f spark-s3-secret.yamlExpected output:
secret/spark-s3-secret created
Use JindoSDK
Create a Secret YAML file named
spark-oss-secret.yamlto store credentials used to access OSS:apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # Replace <ACCESS_KEY_ID> with the AccessKey ID of your Alibaba Cloud account. OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # Replace <ACCESS_KEY_SECRET> with the AccessKey Secret of your Alibaba Cloud account. OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>Run the following command to create a Secret:
kubectl apply -f spark-oss-secret.yamlExpected output:
secret/spark-oss-secret created
Step 4: Run a Spark job
Submit a Spark job in the ACK cluster to read data from and write data to an OSS bucket.
Use Hadoop OSS SDK
Create a Spark application YAML file named spark-pagerank.yaml. For a full list of OSS configuration parameters, see the Hadoop-Aliyun module.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# Replace <SPARK_IMAGE> with the Spark container image built in Step 2.
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # Specify the input test dataset. Replace <OSS_BUCKET> with your OSS bucket name.
- "10" # The number of iterations.
sparkVersion: 3.5.5
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
# OSS endpoint. Replace <OSS_ENDPOINT> with your OSS endpoint.
# For example, the internal endpoint for the Beijing region is oss-cn-beijing-internal.aliyuncs.com.
fs.oss.endpoint: <OSS_ENDPOINT>
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
envFrom:
- secretRef:
name: spark-oss-secret # Specify the Secret used to access OSS.
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret # Specify the Secret used to access OSS.
restartPolicy:
type: NeverUse Hadoop S3 SDK
Create a Spark application YAML file named spark-pagerank.yaml. For a full list of S3 configuration parameters, see the Hadoop-AWS module.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# Replace <SPARK_IMAGE> with the Spark container image built in Step 2.
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- s3a://<OSS_BUCKET>/data/pagerank_dataset.txt # Specify the input test dataset. Replace <OSS_BUCKET> with your OSS bucket name.
- "10" # The number of iterations.
sparkVersion: 3.5.5
hadoopConf:
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
# OSS endpoint. Replace <OSS_ENDPOINT> with your OSS endpoint.
# For example, the internal endpoint for the Beijing region is oss-cn-beijing-internal.aliyuncs.com.
fs.s3a.endpoint: <OSS_ENDPOINT>
# The region where the OSS endpoint is located. For example, cn-beijing for the Beijing region.
fs.s3a.endpoint.region: <OSS_REGION>
driver:
cores: 1
coreLimit: 1200m
memory: 512m
envFrom:
- secretRef:
name: spark-s3-secret # Specify the Secret used to access OSS.
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-s3-secret # Specify the Secret used to access OSS.
restartPolicy:
type: NeverUse JindoSDK
Save the following SparkApplication manifest as spark-pagerank.yaml.
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# Replace <SPARK_IMAGE> with the Spark container image built in Step 2.
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # Specify the input test dataset. Replace <OSS_BUCKET> with your OSS bucket name.
- "10" # The number of iterations.
sparkVersion: 3.5.5
hadoopConf:
fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # The internal endpoint for the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com.
fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret # Specify the Secret used to access OSS.
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret # Specify the Secret used to access OSS.
restartPolicy:
type: NeverRun the following command to submit the Spark job:
kubectl apply -f spark-pagerank.yamlRun the following command to monitor the status of the Spark job:
kubectl get sparkapplications spark-pagerankExpected output:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank COMPLETED 1 2024-10-09T12:54:25Z 2024-10-09T12:55:46Z 90sRun the following command to view the last 20 log entries of the driver pod:
kubectl logs spark-pagerank-driver --tail=20Expected output:
Use Hadoop OSS SDK
The log indicates that the Spark job has been successfully executed.
30024 has rank: 1.0709659078941967 . 21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared 24/10/09 12:48:36 INFO BlockManager: BlockManager stopped 24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5Use Hadoop S3 SDK
3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 25/04/07 03:54:11 INFO SparkContext: SparkContext is stopping with exitCode 0. 25/04/07 03:54:11 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-0f7dec960e615617-driver-svc.spark.svc:4040 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 25/04/07 03:54:11 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 25/04/07 03:54:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 25/04/07 03:54:11 INFO MemoryStore: MemoryStore cleared 25/04/07 03:54:11 INFO BlockManager: BlockManager stopped 25/04/07 03:54:11 INFO BlockManagerMaster: BlockManagerMaster stopped 25/04/07 03:54:11 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 25/04/07 03:54:11 INFO SparkContext: Successfully stopped SparkContext 25/04/07 03:54:11 INFO ShutdownHookManager: Shutdown hook called 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /var/data/spark-20d425bb-f442-4b0a-83e2-5a0202959a54/spark-ff5bbf08-4343-4a7a-9ce0-3f7c127cf4a9 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /tmp/spark-a421839a-07af-49c0-b637-f15f76c3e752 25/04/07 03:54:11 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.Use JindoSDK
The log indicates that the Spark job has been successfully executed.
21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared 24/10/09 12:55:45 INFO BlockManager: BlockManager stopped 24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168
(Optional) Step 5: Environment cleanup
After you complete this tutorial, you can release the resources that are no longer required.
Run the following command to delete the Spark job:
kubectl delete -f spark-pagerank.yamlRun the following command to remove the Secret:
Use Hadoop OSS SDK
kubectl delete -f spark-oss-secret.yamlUse Hadoop S3 SDK
kubectl delete -f spark-s3-secret.yamlUse JindoSDK
kubectl delete -f spark-oss-secret.yamlReferences
For more information about how to use Spark History Server to view information about Spark jobs, see Use Spark History Server to view information about Spark jobs.
For more information about how to use Simple Log Service to collect the logs of Spark jobs, see Use Simple Log Service to collect the logs of Spark jobs.
For more information about how to use elastic container instances to run Spark jobs, see Use elastic container instances to run Spark jobs.
For more information about how to use Celeborn to enable RSS for Spark jobs, see Use Celeborn to enable RSS for Spark jobs.