本文以Spark內建的PageRank作業為例,介紹如何在ACK叢集中運行Spark作業,並配置讀寫位於阿里雲OSS(Object Storage Service服務)中的資料。
前提條件
已建立1.24及以上的ACK託管叢集Pro版、ACK Serverless叢集Pro版。相關操作,請參見建立ACK託管叢集、建立ACK Serverless叢集、手動升級叢集。
已部署ack-spark-operator組件,請參見部署ack-spark-operator組件。
已通過kubectl工具串連叢集。具體操作,請參見擷取叢集KubeConfig並通過kubectl工具串連叢集。
已建立OSS儲存空間。具體操作請參見建立儲存空間。
已安裝ossutil並配置ossutil。關於ossutil命令參考請參見命令列工具ossutil命令參考。
流程概述
本文將引導您完成如下步驟,協助您瞭解如何在ACK叢集中運行Spark作業並配置讀寫OSS資料。
準備測試資料並上傳至OSS:產生用於PageRank的測試資料集並將其上傳至OSS。
構建Spark容器鏡像:構建包含了訪問OSS相關Jar包依賴的Spark容器鏡像。
建立Secret儲存OSS訪問憑據:為Spark作業建立指定的OSS訪問憑據,以確保安全訪問OSS。
提交樣本Spark作業:建立並提交一個Spark作業的設定檔,實現對OSS資料的讀寫。
(可選)環境清理:在完成測試後,清理無需使用的Spark作業和資源,避免產生預期外的費用。
步驟一:準備測試資料並上傳至OSS
首先,您需產生測試資料集並將其上傳到指定的OSS中,便於後續的Spark作業使用。以下是產生測試資料集的指令碼和上傳OSS的操作步驟。
根據以下內容建立名為
generate_pagerank_dataset.sh的指令碼,用於產生測試資料集。#!/bin/bash # 檢查參數數量 if [ "$#" -ne 2 ]; then echo "Usage: $0 M N" echo "M: Number of web pages" echo "N: Number of records to generate" exit 1 fi M=$1 N=$2 # 檢查 M 和 N 是否為正整數 if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then echo "Both M and N must be positive integers." exit 1 fi # 產生資料集 for ((i=1; i<=$N; i++)); do # 保證源頁面和目標頁面不相同 while true; do src=$((RANDOM % M + 1)) dst=$((RANDOM % M + 1)) if [ "$src" -ne "$dst" ]; then echo "$src $dst" break fi done done執行如下命令,產生測試資料集。
M=100000 # 網頁數量 N=10000000 # 記錄數量 # 隨機產生資料集並儲存至 pagerank_dataset.txt bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt執行如下命令,將產生的資料集上傳至OSS Bucket中的
data/路徑下:ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/
步驟二:構建Spark容器鏡像
為了在Spark作業中訪問資料,首先需要構建一個包含了訪問OSS相關Jar包依賴的容器鏡像。您可以選擇使用Hadoop OSS SDK、Hadoop S3 SDK或JindoSDK來訪問OSS,本文示範用的容器鏡像根據如下樣本Dockerfile構建。關於Container Registry構建鏡像請參見使用企業版執行個體構建鏡像。
樣本Dockerfile檔案中使用的Spark基礎鏡像來自於開源社區,您可按需求自行替換成自己的Spark鏡像。
您需要根據使用的Spark版本,選擇相應的Hadoop OSS SDK、Hadoop S3 SDK或者JindoSDK版本。
使用Hadoop OSS SDK
以Spark 3.5.5版本和Hadoop OSS SDK 3.3.4版本為例,建立如下樣本Dockerfile檔案。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars使用Hadoop S3 SDK
以Spark 3.5.5版本和Hadoop S3 SDK 3.3.4版本為例,建立如下樣本Dockerfile檔案。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop AWS S3 support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar ${SPARK_HOME}/jars使用JindoSDK
以Spark 3.5.5版本和JindoSDK 6.8.0版本為例,建立如下樣本Dockerfile檔案。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Add dependency for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.8.0/jindo-core-6.8.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.8.0/jindo-sdk-6.8.0.jar ${SPARK_HOME}/jars步驟三:建立Secret儲存OSS訪問憑據
在Spark作業中訪問OSS資料時,需要配置OSS訪問憑證,為了避免在作業中寫入程式碼訪問憑據,需要建立一個Secret用於儲存敏感資訊,並以環境變數的形式注入到容器中。
使用Hadoop OSS SDK
根據以下內容建立Secret資訊清單檔,並儲存為
spark-oss-secret.yaml。apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # 需將 <ACCESS_KEY_ID> 替換成阿里雲 AccessKey ID。 OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 需將 <ACCESS_KEY_SECRET> 替換成阿里雲 AccessKey Secret。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>執行如下命令建立Secret。
kubectl apply -f spark-oss-secret.yaml預期輸出如下。
secret/spark-oss-secret created
使用Hadoop S3 SDK
根據以下內容建立Secret資訊清單檔,並儲存為
spark-s3-secret.yaml。apiVersion: v1 kind: Secret metadata: name: spark-s3-secret namespace: default stringData: # 需將 <ACCESS_KEY_ID> 替換成阿里雲 AccessKey ID。 AWS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 需將 <ACCESS_KEY_SECRET> 替換成阿里雲 AccessKey Secret。 AWS_SECRET_ACCESS_KEY: <ACCESS_KEY_SECRET>執行如下命令建立Secret。
kubectl apply -f spark-s3-secret.yaml預期輸出如下。
secret/spark-s3-secret created
使用JindoSDK
根據以下內容建立Secret資訊清單檔,並儲存為
spark-oss-secret.yaml。apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # 需將 <ACCESS_KEY_ID> 替換成阿里雲 AccessKey ID。 OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # 需將 <ACCESS_KEY_SECRET> 替換成阿里雲 AccessKey Secret。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>執行如下命令建立Secret。
kubectl apply -f spark-oss-secret.yaml預期輸出如下。
secret/spark-oss-secret created
步驟四:運行樣本Spark作業
在ACK叢集上提交Spark作業,實現OSS資料的讀寫。
使用Hadoop OSS SDK
建立如下SparkApplication資訊清單檔並儲存為spark-pagerank.yaml。關於OSS完整的配置參數列表,請參見Hadoop-Aliyun module。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# 需將 <SPARK_IMAGE> 替換成步驟二中構建得到的 Spark 容器鏡像。
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定輸入測試資料集,將<OSS_BUCKET>替換成OSS Bucket名稱。
- "10" # 迭代次數。
sparkVersion: 3.5.5
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
# OSS 訪問端點,需將 <OSS_ENDPOINT> 替換成您的 OSS 訪問端點,
# 例如,北京地區 OSS 的內網訪問端點為 oss-cn-beijing-internal.aliyuncs.com。
fs.oss.endpoint: <OSS_ENDPOINT>
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
restartPolicy:
type: Never使用Hadoop S3 SDK
建立如下SparkApplication資訊清單檔並儲存為spark-pagerank.yaml。關於S3完整的配置參數列表,請參見Hadoop-AWS module。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# 需將 <SPARK_IMAGE> 替換成步驟二中構建得到的 Spark 容器鏡像。
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- s3a://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定輸入測試資料集,將<OSS_BUCKET>替換成OSS Bucket名稱。
- "10" # 迭代次數。
sparkVersion: 3.5.5
hadoopConf:
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
# OSS 訪問端點,需將 <OSS_ENDPOINT> 替換成您的 OSS 訪問端點,
# 例如,北京地區 OSS 的內網訪問端點為 oss-cn-beijing-internal.aliyuncs.com。
fs.s3a.endpoint: <OSS_ENDPOINT>
# OSS 訪問端點所在地區,例如北京地區為 cn-beijing。
fs.s3a.endpoint.region: <OSS_REGION>
driver:
cores: 1
coreLimit: 1200m
memory: 512m
envFrom:
- secretRef:
name: spark-s3-secret #指定訪問OSS的安全憑據。
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-s3-secret #指定訪問OSS的安全憑據。
restartPolicy:
type: Never使用JindoSDK
建立如下SparkApplication資訊清單檔並儲存為spark-pagerank.yaml。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# 需將 <SPARK_IMAGE> 替換成步驟二中構建得到的 Spark 容器鏡像。
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 指定輸入測試資料集,將<OSS_BUCKET>替換成OSS Bucket名稱。
- "10" # 迭代次數。
sparkVersion: 3.5.5
hadoopConf:
fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # OSS訪問端點。例如北京地區OSS的內網訪問地址為oss-cn-beijing-internal.aliyuncs.com。
fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret #指定訪問OSS的安全憑據。
restartPolicy:
type: Never執行如下命令提交Spark作業。
kubectl apply -f spark-pagerank.yaml執行如下命令查看Spark作業執行狀態並等待作業執行結果。
kubectl get sparkapplications spark-pagerank預期輸出。
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank COMPLETED 1 2024-10-09T12:54:25Z 2024-10-09T12:55:46Z 90s執行如下命令查看Driver pod日誌輸出的最後20行。
kubectl logs spark-pagerank-driver --tail=20預期輸出。
使用Hadoop OSS SDK
從日誌中可以看到,Spark作業已經成功運行結束。
30024 has rank: 1.0709659078941967 . 21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared 24/10/09 12:48:36 INFO BlockManager: BlockManager stopped 24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5使用Hadoop S3 SDK
3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 25/04/07 03:54:11 INFO SparkContext: SparkContext is stopping with exitCode 0. 25/04/07 03:54:11 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-0f7dec960e615617-driver-svc.spark.svc:4040 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 25/04/07 03:54:11 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 25/04/07 03:54:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 25/04/07 03:54:11 INFO MemoryStore: MemoryStore cleared 25/04/07 03:54:11 INFO BlockManager: BlockManager stopped 25/04/07 03:54:11 INFO BlockManagerMaster: BlockManagerMaster stopped 25/04/07 03:54:11 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 25/04/07 03:54:11 INFO SparkContext: Successfully stopped SparkContext 25/04/07 03:54:11 INFO ShutdownHookManager: Shutdown hook called 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /var/data/spark-20d425bb-f442-4b0a-83e2-5a0202959a54/spark-ff5bbf08-4343-4a7a-9ce0-3f7c127cf4a9 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /tmp/spark-a421839a-07af-49c0-b637-f15f76c3e752 25/04/07 03:54:11 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.使用JindoSDK
從日誌中可以看到,Spark作業已經成功運行結束。
21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared 24/10/09 12:55:45 INFO BlockManager: BlockManager stopped 24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168
(可選)步驟五:環境清理
如果您已體驗完本教程,相關資源如不再需要,可以通過執行以下命令進行刪除。
執行如下命令刪除Spark作業。
kubectl delete -f spark-pagerank.yaml執行如下命令刪除Secret資源。
使用Hadoop OSS SDK
kubectl delete -f spark-oss-secret.yaml使用Hadoop S3 SDK
kubectl delete -f spark-s3-secret.yaml使用JindoSDK
kubectl delete -f spark-oss-secret.yaml相關文檔
關於如何使用 Spark History Server 查看 Spark 作業資訊,請參見使用Spark History Server查看Spark作業資訊。
關於如何使用Log Service收集 Spark 作業日誌,請參見使用Log Service收集Spark作業日誌。
關於如何使用彈性資源運行 Spark 作業,請參見使用ECI彈性資源運行Spark作業。
關於如何在 Spark 作業中使用 Celeborn 作為 RSS,請參見Spark作業使用Celeborn作為RSS。