全部產品
Search
文件中心

Container Service for Kubernetes:Spark作業讀寫OSS資料

更新時間:Apr 09, 2025

本文以Spark內建的PageRank作業為例,介紹如何在ACK叢集中運行Spark作業,並配置讀寫位於阿里雲OSS(Object Storage Service服務)中的資料。

前提條件

流程概述

本文將引導您完成如下步驟,協助您瞭解如何在ACK叢集中運行Spark作業並配置讀寫OSS資料。

  1. 準備測試資料並上傳至OSS:產生用於PageRank的測試資料集並將其上傳至OSS。

  2. 構建Spark容器鏡像:構建包含了訪問OSS相關Jar包依賴的Spark容器鏡像。

  3. 建立Secret儲存OSS訪問憑據:為Spark作業建立指定的OSS訪問憑據,以確保安全訪問OSS。

  4. 提交樣本Spark作業:建立並提交一個Spark作業的設定檔,實現對OSS資料的讀寫。

  5. (可選)環境清理:在完成測試後,清理無需使用的Spark作業和資源,避免產生預期外的費用。

步驟一:準備測試資料並上傳至OSS

首先,您需產生測試資料集並將其上傳到指定的OSS中,便於後續的Spark作業使用。以下是產生測試資料集的指令碼和上傳OSS的操作步驟。

  1. 根據以下內容建立名為 generate_pagerank_dataset.sh 的指令碼,用於產生測試資料集。

    #!/bin/bash
    
    # 檢查參數數量
    if [ "$#" -ne 2 ]; then
        echo "Usage: $0 M N"
        echo "M: Number of web pages"
        echo "N: Number of records to generate"
        exit 1
    fi
    
    M=$1
    N=$2
    
    # 檢查 M 和 N 是否為正整數
    if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then
        echo "Both M and N must be positive integers."
        exit 1
    fi
    
    # 產生資料集
    
    for ((i=1; i<=$N; i++)); do
        # 保證源頁面和目標頁面不相同
        while true; do
            src=$((RANDOM % M + 1))
            dst=$((RANDOM % M + 1))
            if [ "$src" -ne "$dst" ]; then
                echo "$src $dst"
                break
            fi
        done
    done
  2. 執行如下命令,產生測試資料集。

    M=100000    # 網頁數量
    
    N=10000000  # 記錄數量
    
    # 隨機產生資料集並儲存至 pagerank_dataset.txt
    bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt
  3. 執行如下命令,將產生的資料集上傳至OSS Bucket中的 data/ 路徑下:

    ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/

步驟二:構建Spark容器鏡像

為了在Spark作業中訪問資料,首先需要構建一個包含了訪問OSS相關Jar包依賴的容器鏡像。您可以選擇使用Hadoop OSS SDK、Hadoop S3 SDK或JindoSDK來訪問OSS,本文示範用的容器鏡像根據如下樣本Dockerfile構建。關於Container Registry構建鏡像請參見使用企業版執行個體構建鏡像

說明
  • 樣本Dockerfile檔案中使用的Spark基礎鏡像來自於開源社區,您可按需求自行替換成自己的Spark鏡像。

  • 您需要根據使用的Spark版本,選擇相應的Hadoop OSS SDK、Hadoop S3 SDK或者JindoSDK版本。

使用Hadoop OSS SDK

以Spark 3.5.5版本和Hadoop OSS SDK 3.3.4版本為例,建立如下樣本Dockerfile檔案。

ARG SPARK_IMAGE=spark:3.5.5

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

使用Hadoop S3 SDK

以Spark 3.5.5版本和Hadoop S3 SDK 3.3.4版本為例,建立如下樣本Dockerfile檔案。

ARG SPARK_IMAGE=spark:3.5.5

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop AWS S3 support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar ${SPARK_HOME}/jars

使用JindoSDK

以Spark 3.5.5版本和JindoSDK 6.8.0版本為例,建立如下樣本Dockerfile檔案。

ARG SPARK_IMAGE=spark:3.5.5

FROM ${SPARK_IMAGE}

# Add dependency for JindoSDK support
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.8.0/jindo-core-6.8.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.8.0/jindo-sdk-6.8.0.jar ${SPARK_HOME}/jars

步驟三:建立Secret儲存OSS訪問憑據

在Spark作業中訪問OSS資料時,需要配置OSS訪問憑證,為了避免在作業中寫入程式碼訪問憑據,需要建立一個Secret用於儲存敏感資訊,並以環境變數的形式注入到容器中。

使用Hadoop OSS SDK

  1. 根據以下內容建立Secret資訊清單檔,並儲存為 spark-oss-secret.yaml

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-oss-secret
      namespace: default
    stringData:
      # 需將 <ACCESS_KEY_ID> 替換成阿里雲 AccessKey ID。
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      # 需將 <ACCESS_KEY_SECRET> 替換成阿里雲 AccessKey Secret。
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  2. 執行如下命令建立Secret。

    kubectl apply -f spark-oss-secret.yaml

    預期輸出如下。

    secret/spark-oss-secret created

使用Hadoop S3 SDK

  1. 根據以下內容建立Secret資訊清單檔,並儲存為 spark-s3-secret.yaml

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-s3-secret
      namespace: default
    stringData:
      # 需將 <ACCESS_KEY_ID> 替換成阿里雲 AccessKey ID。
      AWS_ACCESS_KEY_ID: <ACCESS_KEY_ID> 
      # 需將 <ACCESS_KEY_SECRET> 替換成阿里雲 AccessKey Secret。
      AWS_SECRET_ACCESS_KEY: <ACCESS_KEY_SECRET>
  2. 執行如下命令建立Secret。

    kubectl apply -f spark-s3-secret.yaml

    預期輸出如下。

    secret/spark-s3-secret created

使用JindoSDK

  1. 根據以下內容建立Secret資訊清單檔,並儲存為 spark-oss-secret.yaml

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-oss-secret
      namespace: default
    stringData:
      # 需將 <ACCESS_KEY_ID> 替換成阿里雲 AccessKey ID。
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      # 需將 <ACCESS_KEY_SECRET> 替換成阿里雲 AccessKey Secret。
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  2. 執行如下命令建立Secret。

    kubectl apply -f spark-oss-secret.yaml

    預期輸出如下。

    secret/spark-oss-secret created

步驟四:運行樣本Spark作業

在ACK叢集上提交Spark作業,實現OSS資料的讀寫。

使用Hadoop OSS SDK

建立如下SparkApplication資訊清單檔並儲存為spark-pagerank.yaml。關於OSS完整的配置參數列表,請參見Hadoop-Aliyun module

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成步驟二中構建得到的 Spark 容器鏡像。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # 指定輸入測試資料集,將<OSS_BUCKET>替換成OSS Bucket名稱。
  - "10"                                                   # 迭代次數。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # OSS 訪問端點,需將 <OSS_ENDPOINT> 替換成您的 OSS 訪問端點,
    # 例如,北京地區 OSS 的內網訪問端點為 oss-cn-beijing-internal.aliyuncs.com。 
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    envFrom:
    - secretRef:
        name: spark-oss-secret               #指定訪問OSS的安全憑據。
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-oss-secret               #指定訪問OSS的安全憑據。
  restartPolicy:
    type: Never

使用Hadoop S3 SDK

建立如下SparkApplication資訊清單檔並儲存為spark-pagerank.yaml。關於S3完整的配置參數列表,請參見Hadoop-AWS module

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成步驟二中構建得到的 Spark 容器鏡像。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt           # 指定輸入測試資料集,將<OSS_BUCKET>替換成OSS Bucket名稱。
  - "10"                                                   # 迭代次數。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    # OSS 訪問端點,需將 <OSS_ENDPOINT> 替換成您的 OSS 訪問端點,
    # 例如,北京地區 OSS 的內網訪問端點為 oss-cn-beijing-internal.aliyuncs.com。
    fs.s3a.endpoint: <OSS_ENDPOINT>
    # OSS 訪問端點所在地區,例如北京地區為 cn-beijing。
    fs.s3a.endpoint.region: <OSS_REGION>
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    envFrom:
    - secretRef:
        name: spark-s3-secret               #指定訪問OSS的安全憑據。
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-s3-secret               #指定訪問OSS的安全憑據。
  restartPolicy:
    type: Never

使用JindoSDK

建立如下SparkApplication資訊清單檔並儲存為spark-pagerank.yaml

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # 需將 <SPARK_IMAGE> 替換成步驟二中構建得到的 Spark 容器鏡像。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt    # 指定輸入測試資料集,將<OSS_BUCKET>替換成OSS Bucket名稱。
  - "10"                                            # 迭代次數。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
    fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                 # OSS訪問端點。例如北京地區OSS的內網訪問地址為oss-cn-beijing-internal.aliyuncs.com。
    fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定訪問OSS的安全憑據。
  executor: 
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret                    #指定訪問OSS的安全憑據。
  restartPolicy:
    type: Never
  1. 執行如下命令提交Spark作業。

    kubectl apply -f spark-pagerank.yaml
  2. 執行如下命令查看Spark作業執行狀態並等待作業執行結果。

    kubectl get sparkapplications spark-pagerank

    預期輸出。

    NAME             STATUS      ATTEMPTS   START                  FINISH                 AGE
    spark-pagerank   COMPLETED   1          2024-10-09T12:54:25Z   2024-10-09T12:55:46Z   90s
  3. 執行如下命令查看Driver pod日誌輸出的最後20行。

    kubectl logs spark-pagerank-driver --tail=20

    預期輸出。

    使用Hadoop OSS SDK

    從日誌中可以看到,Spark作業已經成功運行結束。

    30024 has rank:  1.0709659078941967 .
    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:48:36 INFO BlockManager: BlockManager stopped
    24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5

    使用Hadoop S3 SDK

    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    25/04/07 03:54:11 INFO SparkContext: SparkContext is stopping with exitCode 0.
    25/04/07 03:54:11 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-0f7dec960e615617-driver-svc.spark.svc:4040
    25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    25/04/07 03:54:11 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    25/04/07 03:54:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    25/04/07 03:54:11 INFO MemoryStore: MemoryStore cleared
    25/04/07 03:54:11 INFO BlockManager: BlockManager stopped
    25/04/07 03:54:11 INFO BlockManagerMaster: BlockManagerMaster stopped
    25/04/07 03:54:11 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    25/04/07 03:54:11 INFO SparkContext: Successfully stopped SparkContext
    25/04/07 03:54:11 INFO ShutdownHookManager: Shutdown hook called
    25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /var/data/spark-20d425bb-f442-4b0a-83e2-5a0202959a54/spark-ff5bbf08-4343-4a7a-9ce0-3f7c127cf4a9
    25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /tmp/spark-a421839a-07af-49c0-b637-f15f76c3e752
    25/04/07 03:54:11 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
    25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
    25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.

    使用JindoSDK

    從日誌中可以看到,Spark作業已經成功運行結束。

    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0.
    24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:55:45 INFO BlockManager: BlockManager stopped
    24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168

(可選)步驟五:環境清理

如果您已體驗完本教程,相關資源如不再需要,可以通過執行以下命令進行刪除。

執行如下命令刪除Spark作業。

kubectl delete -f spark-pagerank.yaml

執行如下命令刪除Secret資源。

使用Hadoop OSS SDK

kubectl delete -f spark-oss-secret.yaml

使用Hadoop S3 SDK

kubectl delete -f spark-s3-secret.yaml

使用JindoSDK

kubectl delete -f spark-oss-secret.yaml

相關文檔