このトピックでは、Container Service for Kubernetes (ACK) クラスターで Spark ジョブを実行し、組み込みの PageRank ジョブを例として使用して、これらのジョブが Object Storage Service (OSS) バケットからデータを読み書きするように構成する方法を示します。
前提条件
Kubernetes 1.24 以降を実行する ACK Pro マネージドクラスター または Serverless Kubernetes Pro クラスター が作成されていること。詳細については、「ACK マネージドクラスターを作成する」、「Serverless Kubernetes クラスターを作成する」、および「ACK クラスターを手動でアップグレードする」をご参照ください。
ack-spark-operator コンポーネントがインストールされていること。詳細については、「手順 1: ack-spark-operator コンポーネントをインストールする」をご参照ください。
kubectl クライアントが ACK クラスターに接続されていること。詳細については、「クラスターの kubeconfig ファイルを取得し、kubectl を使用してクラスターに接続する」をご参照ください。
OSS バケットが作成されていること。詳細については、「バケットを作成する」をご参照ください。
ossutil がインストールおよび構成されていること。 ossutil コマンドの詳細については、「ossutil コマンドリファレンス」をご参照ください。
手順の概要
このトピックでは、ACK クラスターで Spark ジョブを実行し、OSS データの読み取りおよび書き込み操作を構成するための次の手順を説明します。
テストデータを準備して OSS バケットにアップロードする: PageRank ジョブのテストデータセットを生成し、OSS バケットにアップロードします。
Spark コンテナイメージをビルドする: OSS アクセスに必要な JAR 依存関係を Spark コンテナイメージに含めます。
OSS アクセス認証情報を保存するシークレットを作成する: データを不正アクセスから保護するために、Spark ジョブが OSS にアクセスするために使用する認証情報を保存するシークレット YAML ファイルを作成します。
Spark ジョブを送信する: OSS データ処理タスクを実行するために、Spark ジョブの構成ファイルを作成して送信します。
(オプション) 環境のクリーンアップ: コストを削減するために、完了した Spark ジョブと不要になったリソースを削除します。
手順 1: テストデータを準備して OSS バケットにアップロードする
PageRank のテストデータセットを生成し、指定された OSS バケットにアップロードします。
generate_pagerank_dataset.shという名前のファイルを作成するために、次の YAML テンプレートを使用します。このファイルは、テストデータセットの作成に使用されます。#!/bin/bash # 引数の数をチェックします if [ "$#" -ne 2 ]; then echo "使用方法: $0 M N" echo "M: Web ページの数" //日本語コメント echo "N: 生成するレコードの数" //日本語コメント exit 1 fi M=$1 N=$2 # M と N が正の整数であることを確認します if ! [[ "$M" =~ ^[0-9]+$ ]] || ! [[ "$N" =~ ^[0-9]+$ ]]; then echo "M と N は両方とも正の整数でなければなりません。" //日本語コメント exit 1 fi # データセットを生成します for ((i=1; i<=$N; i++)); do # ソースページとターゲットページが異なることを確認します while true; do src=$((RANDOM % M + 1)) dst=$((RANDOM % M + 1)) if [ "$src" -ne "$dst" ]; then echo "$src $dst" break fi done doneテストデータセットを作成するには、次のコマンドを実行します。
M=100000 # Web ページの数 N=10000000 # レコードの数 # データセットをランダムに生成し、pagerank_dataset.txt として保存します bash generate_pagerank_dataset.sh $M $N > pagerank_dataset.txt生成されたデータセットを OSS バケットの
data/ディレクトリにアップロードするには、次のコマンドを実行します。ossutil cp pagerank_dataset.txt oss://<BUCKET_NAME>/data/
手順 2: Spark コンテナイメージをビルドする
OSS アクセスに必要な JAR 依存関係を持つコンテナイメージをビルドします。Hadoop OSS SDK、Hadoop S3 SDK、または JindoSDK を選択できます。Container Registry を使用してイメージをビルドする方法の詳細については、「Container Registry Enterprise Edition インスタンスを使用してイメージをビルドする」をご参照ください。
サンプル Dockerfile で使用されている Spark イメージは、オープンソースコミュニティからのものです。必要に応じて、独自の Spark イメージに置き換えることができます。
Spark のバージョンに基づいて、適切な Hadoop OSS SDK、Hadoop S3 SDK、または JindoSDK のバージョンを選択してください。
Hadoop OSS SDK を使用する
この例では、Spark 3.5.5 と Hadoop OSS SDK 3.3.4 を使用します。次のサンプルコードを使用して Dockerfile を作成します。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Hadoop Aliyun OSS サポートの依存関係を追加します //日本語コメント
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jarsHadoop S3 SDK を使用する
この例では、Spark 3.5.5 と Hadoop S3 SDK 3.3.4 を使用します。次のサンプルコードを使用して Dockerfile を作成します。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# Hadoop AWS S3 サポートの依存関係を追加します //日本語コメント
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.4/hadoop-aws-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar ${SPARK_HOME}/jarsJindoSDK を使用する
この例では、Spark 3.5.5 と JindoSDK 6.8.0 を使用します。次のサンプルコードを使用して Dockerfile を作成します。
ARG SPARK_IMAGE=spark:3.5.5
FROM ${SPARK_IMAGE}
# JindoSDK サポートの依存関係を追加します //日本語コメント
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-core/6.8.0/jindo-core-6.8.0.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/com/aliyun/jindodata/jindo-sdk/6.8.0/jindo-sdk-6.8.0.jar ${SPARK_HOME}/jars手順 3: OSS アクセス認証情報を保存するシークレットを作成する
Spark ジョブで安全な OSS データアクセスを実現するために、OSS アクセス認証情報を構成し、ジョブにハードコーディングするのを避けるために Kubernetes シークレットに保存します。シークレットをコンテナの環境変数として保存します。
Hadoop OSS SDK を使用する
spark-oss-secret.yamlという名前のシークレット YAML ファイルを作成して、OSS にアクセスするために使用される認証情報を保存します。apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # <ACCESS_KEY_ID> を Alibaba Cloud アカウントの AccessKey ID に置き換えます。 OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # <ACCESS_KEY_SECRET> を Alibaba Cloud アカウントの AccessKey Secret に置き換えます。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>シークレットを作成するには、次のコマンドを実行します。
kubectl apply -f spark-oss-secret.yaml予期される出力:
secret/spark-oss-secret created
Hadoop S3 SDK を使用する
spark-s3-secret.yamlという名前のシークレット YAML ファイルを作成して、OSS にアクセスするために使用される認証情報を保存します。apiVersion: v1 kind: Secret metadata: name: spark-s3-secret namespace: default stringData: # <ACCESS_KEY_ID> を Alibaba Cloud アカウントの AccessKey ID に置き換えます。 AWS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # <ACCESS_KEY_SECRET> を Alibaba Cloud アカウントの AccessKey Secret に置き換えます。 AWS_SECRET_ACCESS_KEY: <ACCESS_KEY_SECRET>シークレットを作成するには、次のコマンドを実行します。
kubectl apply -f spark-s3-secret.yaml予期される出力:
secret/spark-s3-secret created
JindoSDK を使用する
spark-oss-secret.yamlという名前のシークレット YAML ファイルを作成して、OSS にアクセスするために使用される認証情報を保存します。apiVersion: v1 kind: Secret metadata: name: spark-oss-secret namespace: default stringData: # <ACCESS_KEY_ID> を Alibaba Cloud アカウントの AccessKey ID に置き換えます。 OSS_ACCESS_KEY_ID: <ACCESS_KEY_ID> # <ACCESS_KEY_SECRET> を Alibaba Cloud アカウントの AccessKey Secret に置き換えます。 OSS_ACCESS_KEY_SECRET: <ACCESS_KEY_SECRET>シークレットを作成するには、次のコマンドを実行します。
kubectl apply -f spark-oss-secret.yaml予期される出力:
secret/spark-oss-secret created
手順 4: Spark ジョブを実行する
OSS バケットからデータを読み取り、OSS バケットにデータを書き込むために、ACK クラスターで Spark ジョブを送信します。
Hadoop OSS SDK を使用する
spark-pagerank.yaml という名前の Spark アプリケーション YAML ファイルを作成します。OSS 構成パラメーターの完全なリストについては、Hadoop-Aliyun モジュールを参照してください。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# <SPARK_IMAGE> を手順 2 でビルドした Spark コンテナイメージに置き換えます。
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 入力テストデータセットを指定します。 <OSS_BUCKET> を OSS バケット名に置き換えます。
- "10" # 反復回数。
sparkVersion: 3.5.5
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
# OSS エンドポイント。 <OSS_ENDPOINT> を OSS エンドポイントに置き換えます。
# たとえば、北京リージョンの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
fs.oss.endpoint: <OSS_ENDPOINT>
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
envFrom:
- secretRef:
name: spark-oss-secret # OSS にアクセスするために使用されるシークレットを指定します。
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret # OSS にアクセスするために使用されるシークレットを指定します。
restartPolicy:
type: NeverHadoop S3 SDK を使用する
spark-pagerank.yaml という名前の Spark アプリケーション YAML ファイルを作成します。S3 構成パラメーターの完全なリストについては、Hadoop-AWS モジュールを参照してください。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# <SPARK_IMAGE> を手順 2 でビルドした Spark コンテナイメージに置き換えます。
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- s3a://<OSS_BUCKET>/data/pagerank_dataset.txt # 入力テストデータセットを指定します。 <OSS_BUCKET> を OSS バケット名に置き換えます。
- "10" # 反復回数。
sparkVersion: 3.5.5
hadoopConf:
fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
# OSS エンドポイント。 <OSS_ENDPOINT> を OSS エンドポイントに置き換えます。
# たとえば、北京リージョンの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
fs.s3a.endpoint: <OSS_ENDPOINT>
# OSS エンドポイントが配置されているリージョン。たとえば、北京リージョンの場合は cn-beijing です。
fs.s3a.endpoint.region: <OSS_REGION>
driver:
cores: 1
coreLimit: 1200m
memory: 512m
envFrom:
- secretRef:
name: spark-s3-secret # OSS にアクセスするために使用されるシークレットを指定します。
serviceAccount: spark-operator-spark
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-s3-secret # OSS にアクセスするために使用されるシークレットを指定します。
restartPolicy:
type: NeverJindoSDK を使用する
次の SparkApplication マニフェストを spark-pagerank.yaml として保存します。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
# <SPARK_IMAGE> を手順 2 でビルドした Spark コンテナイメージに置き換えます。
image: <SPARK_IMAGE>
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.5.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 入力テストデータセットを指定します。 <OSS_BUCKET> を OSS バケット名に置き換えます。
- "10" # 反復回数。
sparkVersion: 3.5.5
hadoopConf:
fs.AbstractFileSystem.oss.impl: com.aliyun.jindodata.oss.JindoOSS
fs.oss.impl: com.aliyun.jindodata.oss.JindoOssFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # 中国 (北京) リージョンの内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
fs.oss.credentials.provider: com.aliyun.jindodata.oss.auth.EnvironmentVariableCredentialsProvider
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret # OSS にアクセスするために使用されるシークレットを指定します。
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret # OSS にアクセスするために使用されるシークレットを指定します。
restartPolicy:
type: NeverSpark ジョブを送信するには、次のコマンドを実行します。
kubectl apply -f spark-pagerank.yamlSpark ジョブのステータスを監視するには、次のコマンドを実行します。
kubectl get sparkapplications spark-pagerank予期される出力:
NAME STATUS ATTEMPTS START FINISH AGE spark-pagerank COMPLETED 1 2024-10-09T12:54:25Z 2024-10-09T12:55:46Z 90sドライバー ポッドの最後の 20 個のログエントリを表示するには、次のコマンドを実行します。
kubectl logs spark-pagerank-driver --tail=20予期される出力:
Hadoop OSS SDK を使用する
ログは、Spark ジョブが正常に実行されたことを示しています。
30024 has rank: 1.0709659078941967 . 21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:48:36 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-dd0d4d927151c9d0-driver-svc.default.svc:4040 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:48:36 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:48:36 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:48:36 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:48:36 INFO MemoryStore: MemoryStore cleared 24/10/09 12:48:36 INFO BlockManager: BlockManager stopped 24/10/09 12:48:36 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:48:36 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:48:36 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:48:36 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /tmp/spark-e8b8c2ab-c916-4f84-b60f-f54c0de3a7f0 24/10/09 12:48:36 INFO ShutdownHookManager: Deleting directory /var/data/spark-c5917d98-06fb-46fe-85bc-199b839cb885/spark-23e2c2ae-4754-43ae-854d-2752eb83b2c5Hadoop S3 SDK を使用する
3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 25/04/07 03:54:11 INFO SparkContext: SparkContext is stopping with exitCode 0. 25/04/07 03:54:11 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-0f7dec960e615617-driver-svc.spark.svc:4040 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 25/04/07 03:54:11 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 25/04/07 03:54:11 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 25/04/07 03:54:11 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 25/04/07 03:54:11 INFO MemoryStore: MemoryStore cleared 25/04/07 03:54:11 INFO BlockManager: BlockManager stopped 25/04/07 03:54:11 INFO BlockManagerMaster: BlockManagerMaster stopped 25/04/07 03:54:11 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 25/04/07 03:54:11 INFO SparkContext: Successfully stopped SparkContext 25/04/07 03:54:11 INFO ShutdownHookManager: Shutdown hook called 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /var/data/spark-20d425bb-f442-4b0a-83e2-5a0202959a54/spark-ff5bbf08-4343-4a7a-9ce0-3f7c127cf4a9 25/04/07 03:54:11 INFO ShutdownHookManager: Deleting directory /tmp/spark-a421839a-07af-49c0-b637-f15f76c3e752 25/04/07 03:54:11 INFO MetricsSystemImpl: Stopping s3a-file-system metrics system... 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system stopped. 25/04/07 03:54:11 INFO MetricsSystemImpl: s3a-file-system metrics system shutdown complete.JindoSDK を使用する
ログは、Spark ジョブが正常に実行されたことを示しています。
21390 has rank: 0.9933356174074005 . 28500 has rank: 1.0404018494028928 . 2137 has rank: 0.9931000490520374 . 3406 has rank: 0.9562543137167121 . 20904 has rank: 0.8827028621652337 . 25604 has rank: 1.0270134041934191 . 24/10/09 12:55:44 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/10/09 12:55:44 INFO SparkUI: Stopped Spark web UI at http://spark-pagerank-6a5e3d9271584856-driver-svc.default.svc:4040 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/10/09 12:55:44 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/10/09 12:55:44 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/10/09 12:55:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/10/09 12:55:45 INFO MemoryStore: MemoryStore cleared 24/10/09 12:55:45 INFO BlockManager: BlockManager stopped 24/10/09 12:55:45 INFO BlockManagerMaster: BlockManagerMaster stopped 24/10/09 12:55:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/10/09 12:55:45 INFO SparkContext: Successfully stopped SparkContext 24/10/09 12:55:45 INFO ShutdownHookManager: Shutdown hook called 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /var/data/spark-87e8406e-06a7-4b4a-b18f-2193da299d35/spark-093a1b71-121a-4367-9d22-ad4e397c9815 24/10/09 12:55:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-723e2039-a493-49e8-b86d-fff5fd1bb168
(オプション) 手順 5: 環境のクリーンアップ
このチュートリアルを完了したら、不要になったリソースを解放できます。
Spark ジョブを削除するには、次のコマンドを実行します。
kubectl delete -f spark-pagerank.yamlシークレットを削除するには、次のコマンドを実行します。
Hadoop OSS SDK を使用する
kubectl delete -f spark-oss-secret.yamlHadoop S3 SDK を使用する
kubectl delete -f spark-s3-secret.yamlJindoSDK を使用する
kubectl delete -f spark-oss-secret.yaml参考資料
Spark History Server を使用して Spark ジョブに関する情報を表示する方法の詳細については、「Spark History Server を使用して Spark ジョブに関する情報を表示する」をご参照ください。
Simple Log Service を使用して Spark ジョブのログを収集する方法の詳細については、「Simple Log Service を使用して Spark ジョブのログを収集する」をご参照ください。
エラスティックコンテナインスタンスを使用して Spark ジョブを実行する方法の詳細については、「エラスティックコンテナインスタンスを使用して Spark ジョブを実行する」をご参照ください。
Celeborn を使用して Spark ジョブの RSS を有効にする方法の詳細については、「Celeborn を使用して Spark ジョブの RSS を有効にする」をご参照ください。