すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:Spark ジョブで OSS データを読み書きする

最終更新日:Apr 12, 2025

このトピックでは、Container Service for Kubernetes (ACK) クラスターで Spark ジョブを実行し、組み込みの PageRank ジョブを例として使用して、これらのジョブが Object Storage Service (OSS) バケットからデータを読み書きするように構成する方法を示します。

前提条件

手順の概要

このトピックでは、ACK クラスターで Spark ジョブを実行し、OSS データの読み取りおよび書き込み操作を構成するための次の手順を説明します。

  1. テストデータを準備して OSS バケットにアップロードする: PageRank ジョブのテストデータセットを生成し、OSS バケットにアップロードします。

  2. Spark コンテナイメージをビルドする: OSS アクセスに必要な JAR 依存関係を Spark コンテナイメージに含めます。

  3. OSS アクセス認証情報を保存するシークレットを作成する: データを不正アクセスから保護するために、Spark ジョブが OSS にアクセスするために使用する認証情報を保存するシークレット YAML ファイルを作成します。

  4. Spark ジョブを送信する: OSS データ処理タスクを実行するために、Spark ジョブの構成ファイルを作成して送信します。

  5. (オプション) 環境のクリーンアップ: コストを削減するために、完了した Spark ジョブと不要になったリソースを削除します。

手順 1: テストデータを準備して OSS バケットにアップロードする

PageRank のテストデータセットを生成し、指定された OSS バケットにアップロードします。

  1. generate_pagerank_dataset.sh という名前のファイルを作成するために、次の YAML テンプレートを使用します。このファイルは、テストデータセットの作成に使用されます。

    #!/bin/bash
    
    # 引数の数をチェックします
    if [ "$#" -ne 2 ]; then
        echo "使用方法: $0 M N"
        echo "M: Web ページの数"  //日本語コメント
        echo "N: 生成するレコードの数" //日本語コメント
        exit 1
    fi
    
    M=$1
    N=$2
    
    # M と N が正の整数であることを確認します
    if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then
        echo "M と N は両方とも正の整数でなければなりません。" //日本語コメント
        exit 1
    fi
    
    # データセットを生成します
    for ((i=1; i<=$N; i++)); do
        # ソースページとターゲットページが異なることを確認します
        while true; do
            src=$((RANDOM % M + 1))
            dst=$((RANDOM % M + 1))
            if [ "$src" -ne "$dst" ]; then
                echo "$src $dst"
                break
            fi
        done
    done
  2. テストデータセットを作成するには、次のコマンドを実行します。

    M=100000    # Web ページの数
    
    N=10000000  # レコードの数
    
    # データセットをランダムに生成し、pagerank_dataset.txt として保存します
    bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt
  3. 生成されたデータセットを OSS バケットの data/ ディレクトリにアップロードするには、次のコマンドを実行します。

    ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/

手順 2: Spark コンテナイメージをビルドする

OSS アクセスに必要な JAR 依存関係を持つコンテナイメージをビルドします。Hadoop OSS SDK、Hadoop S3 SDK、または JindoSDK を選択できます。Container Registry を使用してイメージをビルドする方法の詳細については、「Container Registry Enterprise Edition インスタンスを使用してイメージをビルドする」をご参照ください。

説明
  • サンプル Dockerfile で使用されている Spark イメージは、オープンソースコミュニティからのものです。必要に応じて、独自の Spark イメージに置き換えることができます。

  • Spark のバージョンに基づいて、適切な Hadoop OSS SDK、Hadoop S3 SDK、または JindoSDK のバージョンを選択してください。

Hadoop OSS SDK を使用する

この例では、Spark 3.5.5 と Hadoop OSS SDK 3.3.4 を使用します。次のサンプルコードを使用して Dockerfile を作成します。

ARG SPARK_IMAGE=spark:3.5.5

FROM ${SPARK_IMAGE}

# Hadoop Aliyun OSS サポートの依存関係を追加します //日本語コメント
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

Hadoop S3 SDK を使用する

この例では、Spark 3.5.5 と Hadoop S3 SDK 3.3.4 を使用します。次のサンプルコードを使用して Dockerfile を作成します。

ARG SPARK_IMAGE=spark:3.5.5

FROM ${SPARK_IMAGE}

# Hadoop AWS S3 サポートの依存関係を追加します //日本語コメント
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar ${SPARK_HOME}/jars

JindoSDK を使用する

この例では、Spark 3.5.5 と JindoSDK 6.8.0 を使用します。次のサンプルコードを使用して Dockerfile を作成します。

ARG SPARK_IMAGE=spark:3.5.5

FROM ${SPARK_IMAGE}

# JindoSDK サポートの依存関係を追加します //日本語コメント
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.8.0/jindo-core-6.8.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.8.0/jindo-sdk-6.8.0.jar ${SPARK_HOME}/jars

手順 3: OSS アクセス認証情報を保存するシークレットを作成する

Spark ジョブで安全な OSS データアクセスを実現するために、OSS アクセス認証情報を構成し、ジョブにハードコーディングするのを避けるために Kubernetes シークレットに保存します。シークレットをコンテナの環境変数として保存します。

Hadoop OSS SDK を使用する

  1. spark-oss-secret.yaml という名前のシークレット YAML ファイルを作成して、OSS にアクセスするために使用される認証情報を保存します。

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-oss-secret
      namespace: default
    stringData:
      # <ACCESS_KEY_ID> を Alibaba Cloud アカウントの AccessKey ID に置き換えます。
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      # <ACCESS_KEY_SECRET> を Alibaba Cloud アカウントの AccessKey Secret に置き換えます。
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  2. シークレットを作成するには、次のコマンドを実行します。

    kubectl apply -f spark-oss-secret.yaml

    予期される出力:

    secret/spark-oss-secret created

Hadoop S3 SDK を使用する

  1. spark-s3-secret.yaml という名前のシークレット YAML ファイルを作成して、OSS にアクセスするために使用される認証情報を保存します。

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-s3-secret
      namespace: default
    stringData:
      # <ACCESS_KEY_ID> を Alibaba Cloud アカウントの AccessKey ID に置き換えます。
      AWS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      # <ACCESS_KEY_SECRET> を Alibaba Cloud アカウントの AccessKey Secret に置き換えます。
      AWS_SECRET_ACCESS_KEY: <ACCESS_KEY_SECRET>
  2. シークレットを作成するには、次のコマンドを実行します。

    kubectl apply -f spark-s3-secret.yaml

    予期される出力:

    secret/spark-s3-secret created

JindoSDK を使用する

  1. spark-oss-secret.yaml という名前のシークレット YAML ファイルを作成して、OSS にアクセスするために使用される認証情報を保存します。

    apiVersion: v1
    kind: Secret
    metadata:
      name: spark-oss-secret
      namespace: default
    stringData:
      # <ACCESS_KEY_ID> を Alibaba Cloud アカウントの AccessKey ID に置き換えます。
      OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID>
      # <ACCESS_KEY_SECRET> を Alibaba Cloud アカウントの AccessKey Secret に置き換えます。
      OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>
  2. シークレットを作成するには、次のコマンドを実行します。

    kubectl apply -f spark-oss-secret.yaml

    予期される出力:

    secret/spark-oss-secret created

手順 4: Spark ジョブを実行する

OSS バケットからデータを読み取り、OSS バケットにデータを書き込むために、ACK クラスターで Spark ジョブを送信します。

Hadoop OSS SDK を使用する

spark-pagerank.yaml という名前の Spark アプリケーション YAML ファイルを作成します。OSS 構成パラメーターの完全なリストについては、Hadoop-Aliyun モジュールを参照してください。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を手順 2 でビルドした Spark コンテナイメージに置き換えます。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # 入力テストデータセットを指定します。 <OSS_BUCKET> を OSS バケット名に置き換えます。
  - "10"                                                   # 反復回数。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # OSS エンドポイント。 <OSS_ENDPOINT> を OSS エンドポイントに置き換えます。
    # たとえば、北京リージョンの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    envFrom:
    - secretRef:
        name: spark-oss-secret               # OSS にアクセスするために使用されるシークレットを指定します。
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-oss-secret               # OSS にアクセスするために使用されるシークレットを指定します。
  restartPolicy:
    type: Never

Hadoop S3 SDK を使用する

spark-pagerank.yaml という名前の Spark アプリケーション YAML ファイルを作成します。S3 構成パラメーターの完全なリストについては、Hadoop-AWS モジュールを参照してください。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を手順 2 でビルドした Spark コンテナイメージに置き換えます。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - s3a://<OSS_BUCKET>/data/pagerank_dataset.txt           # 入力テストデータセットを指定します。 <OSS_BUCKET> を OSS バケット名に置き換えます。
  - "10"                                                   # 反復回数。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
    # OSS エンドポイント。 <OSS_ENDPOINT> を OSS エンドポイントに置き換えます。
    # たとえば、北京リージョンの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
    fs.s3a.endpoint: <OSS_ENDPOINT>
    # OSS エンドポイントが配置されているリージョン。たとえば、北京リージョンの場合は cn-beijing です。
    fs.s3a.endpoint.region: <OSS_REGION>
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    envFrom:
    - secretRef:
        name: spark-s3-secret               # OSS にアクセスするために使用されるシークレットを指定します。
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef: 
        name: spark-s3-secret               # OSS にアクセスするために使用されるシークレットを指定します。
  restartPolicy:
    type: Never

JindoSDK を使用する

次の SparkApplication マニフェストを spark-pagerank.yaml として保存します。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を手順 2 でビルドした Spark コンテナイメージに置き換えます。
  image: <SPARK_IMAGE>
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt    # 入力テストデータセットを指定します。 <OSS_BUCKET> を OSS バケット名に置き換えます。
  - "10"                                            # 反復回数。
  sparkVersion: 3.5.5
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
    fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                 # 中国 (北京) リージョンの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
    fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret                    # OSS にアクセスするために使用されるシークレットを指定します。
  executor: 
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret                    # OSS にアクセスするために使用されるシークレットを指定します。
  restartPolicy:
    type: Never
  1. Spark ジョブを送信するには、次のコマンドを実行します。

    kubectl apply -f spark-pagerank.yaml
  2. Spark ジョブのステータスを監視するには、次のコマンドを実行します。

    kubectl get sparkapplications spark-pagerank

    予期される出力:

    NAME             STATUS      ATTEMPTS   START                  FINISH                 AGE
    spark-pagerank   COMPLETED   1          2024-10-09T12:54:25Z   2024-10-09T12:55:46Z   90s
  3. ドライバー ポッドの最後の 20 個のログエントリを表示するには、次のコマンドを実行します。

    kubectl logs spark-pagerank-driver --tail=20

    予期される出力:

    Hadoop OSS SDK を使用する

    ログは、Spark ジョブが正常に実行されたことを示しています。

    30024 has rank:  1.0709659078941967 .
    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:48:36 INFO BlockManager: BlockManager stopped
    24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0
    24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5

    Hadoop S3 SDK を使用する

    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    25/04/07 03:54:11 INFO SparkContext: SparkContext is stopping with exitCode 0.
    25/04/07 03:54:11 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-0f7dec960e615617-driver-svc.spark.svc:4040
    25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    25/04/07 03:54:11 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    25/04/07 03:54:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    25/04/07 03:54:11 INFO MemoryStore: MemoryStore cleared
    25/04/07 03:54:11 INFO BlockManager: BlockManager stopped
    25/04/07 03:54:11 INFO BlockManagerMaster: BlockManagerMaster stopped
    25/04/07 03:54:11 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    25/04/07 03:54:11 INFO SparkContext: Successfully stopped SparkContext
    25/04/07 03:54:11 INFO ShutdownHookManager: Shutdown hook called
    25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /var/data/spark-20d425bb-f442-4b0a-83e2-5a0202959a54/spark-ff5bbf08-4343-4a7a-9ce0-3f7c127cf4a9
    25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /tmp/spark-a421839a-07af-49c0-b637-f15f76c3e752
    25/04/07 03:54:11 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system...
    25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system stopped.
    25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.

    JindoSDK を使用する

    ログは、Spark ジョブが正常に実行されたことを示しています。

    21390 has rank:  0.9933356174074005 .
    28500 has rank:  1.0404018494028928 .
    2137 has rank:  0.9931000490520374 .
    3406 has rank:  0.9562543137167121 .
    20904 has rank:  0.8827028621652337 .
    25604 has rank:  1.0270134041934191 .
    24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0.
    24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors
    24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down
    24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed.
    24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared
    24/10/09 12:55:45 INFO BlockManager: BlockManager stopped
    24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped
    24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext
    24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815
    24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168

(オプション) 手順 5: 環境のクリーンアップ

このチュートリアルを完了したら、不要になったリソースを解放できます。

Spark ジョブを削除するには、次のコマンドを実行します。

kubectl delete -f spark-pagerank.yaml

シークレットを削除するには、次のコマンドを実行します。

Hadoop OSS SDK を使用する

kubectl delete -f spark-oss-secret.yaml

Hadoop S3 SDK を使用する

kubectl delete -f spark-s3-secret.yaml

JindoSDK を使用する

kubectl delete -f spark-oss-secret.yaml

参考資料