Apache Spark は、大規模データ処理向けの高速コンピューティングエンジンであり、データ分析および機械学習の分野で広く利用されています。Spark Operator は、Kubernetes 上における Spark ジョブのデプロイメントおよびライフサイクル管理を自動化します。本トピックでは、ACK クラスター上で Spark Operator を使用して Spark ジョブを実行・管理する手順を説明し、ビッグデータワークロードを効率的に処理する方法を解説します。
前提条件
-
Kubernetes 1.24 以降を実行している ACK Pro マネージドクラスター または ACK Serverless Pro クラスター。詳細については、「マネージドクラスターの作成」「ACK Serverless クラスターの作成」および「ACK クラスターの手動アップグレード」をご参照ください。
kubectl クライアントが ACK クラスターに接続されていること。詳細については、「kubectl を使用した ACK クラスターへの接続」をご参照ください。
仕組み
Spark Operator は、Kubernetes 上における Spark ジョブのライフサイクルを自動化します。SparkApplication や ScheduledSparkApplication などの カスタムリソース定義 (CRD) を活用し、宣言型構成により Spark ジョブを管理します。Spark Operator は、オートスケーリング、ヘルスチェック、リソース管理といった Kubernetes のネイティブ機能を活用して、ワークロードを効率的に実行・監視します。ACK では、コミュニティの kubeflow/spark-operator を基盤とする ack-spark-operator コンポーネントを提供しています。詳細については、「Spark Operator | Kubeflow」をご参照ください。
メリット:
-
簡易な管理:Kubernetes 上での宣言型構成を活用し、Spark ジョブのデプロイメントおよびライフサイクル管理を自動化します。
-
マルチテナント対応:Kubernetes 名前空間およびリソースクォータを活用して、ユーザー単位のリソース隔離および割り当てを実現します。ノード選択 を使用することで、Spark ワークロードを専用リソース上で実行できます。
-
エラスティックなリソースプロビジョニング:業務ピーク時に迅速に大量のエラスティックリソースを確保できるよう、Elastic Container Instance (ECI) や エラスティックノードプール などのエラスティックリソースを活用します。これにより、パフォーマンスとコストのバランスを最適化できます。
利用シーン:
-
データ分析:データサイエンティストがインタラクティブなデータ分析およびデータクレンジングに Spark を活用できます。
-
バッチ処理:大規模データセットを処理するためのスケジュールされたバッチジョブを実行します。
-
リアルタイム処理:Spark Streaming ライブラリにより、リアルタイムのデータストリーム処理が可能です。
操作手順の概要
本トピックでは、ACK クラスター上で Spark Operator を使用して Spark ジョブを実行・管理する方法について説明します。
-
ack-spark-operator コンポーネントのデプロイ:ACK クラスターに Spark Operator をインストールし、Spark ジョブの管理および実行を有効化します。
-
Spark ジョブの送信:特定のデータ処理タスクを実行するための Spark ジョブ マニフェスト を作成・送信します。
-
Spark ジョブの監視:ジョブの実行状況を監視し、詳細な実行情報およびログを取得します。
-
Spark Web UI へのアクセス:Spark ジョブの実行状況をより直感的に確認できる Web UI にアクセスします。
-
Spark ジョブの更新:要件に応じてジョブ構成を調整し、変更を適用します。
-
Spark ジョブの削除:予期しない課金を回避するために、完了済みまたは不要となった Spark ジョブをクリーンアップします。
ステップ 1:ack-spark-operator コンポーネントのデプロイ
ACK コンソール にログインします。左側のナビゲーションウィンドウで、 をクリックします。
-
Marketplace ページで、アプリカタログ タブをクリックし、ack-spark-operator を検索して選択します。
-
ack-spark-operator ページで、デプロイ をクリックします。
-
作成する パネルで、クラスターおよび名前空間を選択し、次へ をクリックします。
-
パラメーター ページでパラメーターを設定し、OK をクリックします。
以下の表では、主な構成パラメーターについて説明します。完全な一覧については、ack-spark-operator ページの ConfigMap タブをご参照ください。
パラメーター
説明
デフォルト値
controller.replicasコントローラーのレプリカ数です。
1
webhook.replicasWebhook のレプリカ数です。
1
spark.jobNamespacesSpark ジョブを実行できる名前空間の一覧です。空文字列 "" を指定すると、すべての名前空間が許可されます。複数の名前空間を指定する場合は、カンマ (
,) で区切ります。-
["default"](デフォルト) -
[""](すべての名前空間) -
["ns1","ns2","ns3"](複数の名前空間)
spark.serviceAccount.nameSpark Operator は、
spark.jobNamespacesで指定された各名前空間に、spark-operator-sparkという名前の ServiceAccount および必要な RBAC リソースを自動的に作成します。この名前をカスタマイズする場合、Spark ジョブを送信する際に新しい名前を明示的に指定する必要があります。spark-operator-spark -
ステップ 2:Spark ジョブの送信
データ処理のための Spark ジョブを送信するには、SparkApplication マニフェストを作成します。
-
以下の SparkApplication マニフェストを作成し、
spark-pi.yamlとして保存します。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi namespace: default # 名前空間は、spark.jobNamespaces で指定された名前空間の一覧に含まれている必要があります。 spec: type: Scala mode: cluster image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.4 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar arguments: - "1000" sparkVersion: 3.5.4 driver: cores: 1 coreLimit: 1200m memory: 512m serviceAccount: spark-operator-spark # ServiceAccount 名をカスタマイズした場合は、値を適宜変更してください。 executor: instances: 1 cores: 1 coreLimit: 1200m memory: 512m restartPolicy: type: Never -
以下のコマンドを実行して Spark ジョブを送信します。
kubectl apply -f spark-pi.yaml期待される出力:
sparkapplication.sparkoperator.k8s.io/spark-pi created
ステップ 3:Spark ジョブの監視
以下のコマンドを実行して、Spark ジョブのステータス、関連する Pod、およびログを確認します。
-
以下のコマンドを実行して、Spark ジョブのステータスを確認します。
kubectl get sparkapplication spark-pi期待される出力:
NAME STATUS ATTEMPTS START FINISH AGE spark-pi SUBMITTED 1 2024-06-04T03:17:11Z <no value> 15s -
以下のコマンドを実行して、Spark ジョブの Pod のステータスを確認します。このコマンドでは、ラベル
sparkoperator.k8s.io/app-nameの値がspark-piである Pod をフィルターします。kubectl get pod -l sparkoperator.k8s.io/app-name=spark-pi期待される出力:
NAME READY STATUS RESTARTS AGE spark-pi-driver 1/1 Running 0 49s spark-pi-7272428fc8f5f392-exec-1 1/1 Running 0 13sSpark ジョブの実行が完了すると、ドライバーが自動的にすべてのエグゼキュータ Pod を削除します。
-
以下のコマンドを実行して、Spark ジョブの詳細情報を表示します。
kubectl describe sparkapplication spark-pi -
以下のコマンドを実行して、ドライバーポッドからのログの最後の 20 行を表示します。
kubectl logs --tail=20 spark-pi-driver期待される出力:
24/05/30 10:05:30 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 24/05/30 10:05:30 INFO DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 7.942 s 24/05/30 10:05:30 INFO DAGScheduler: Job 0 is finished. Cancelling potential speculative or zombie tasks for this job 24/05/30 10:05:30 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage finished 24/05/30 10:05:30 INFO DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 8.043996 s Pi is roughly 3.1419522314195225 24/05/30 10:05:30 INFO SparkContext: SparkContext is stopping with exitCode 0. 24/05/30 10:05:30 INFO SparkUI: Stopped Spark web UI at http://spark-pi-1e18858fc8f56b14-driver-svc.default.svc:4040 24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend: Shutting down all executors 24/05/30 10:05:30 INFO KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Asking each executor to shut down 24/05/30 10:05:30 WARN ExecutorPodsWatchSnapshotSource: Kubernetes client has been closed. 24/05/30 10:05:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 24/05/30 10:05:30 INFO MemoryStore: MemoryStore cleared 24/05/30 10:05:30 INFO BlockManager: BlockManager stopped 24/05/30 10:05:30 INFO BlockManagerMaster: BlockManagerMaster stopped 24/05/30 10:05:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 24/05/30 10:05:30 INFO SparkContext: Successfully stopped SparkContext 24/05/30 10:05:30 INFO ShutdownHookManager: Shutdown hook called 24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /var/data/spark-14ed60f1-82cd-4a33-b1b3-9e5d975c5b1e/spark-01120c89-5296-4c83-8a20-0799eef4e0ee 24/05/30 10:05:30 INFO ShutdownHookManager: Deleting directory /tmp/spark-5f98ed73-576a-41be-855d-dabdcf7de189
ステップ 4:Spark Web UI へのアクセス
Web UI は、Spark ジョブのドライバーポッドが実行中(Running)の状態でのみアクセス可能です。ジョブの実行が完了すると、UI は利用できなくなります。
デフォルトでは、ack-spark-operator コンポーネントをデプロイすると、controller.uiService.enable パラメーターが true に設定されます。これにより、Web UI を公開する Service が自動的に作成され、ポートフォワーディングを使用してアクセスできます。デプロイ時にこのパラメーターを false に設定した場合、Service は作成されません。その場合は、ドライバーポッドから直接ポートをフォワードする必要があります。
kubectl port-forward を使用したアクセスは、テスト環境での迅速な検証に適していますが、セキュリティリスクがあるため、本番環境では推奨されません。
-
以下のコマンドのいずれかを、ご使用のシナリオに応じて実行して、Web UI のポートをローカルマシンにフォワードします。
-
Service 経由のポートフォワード
kubectl port-forward services/spark-pi-ui-svc 4040 -
Pod 経由のポートフォワード
kubectl port-forward pods/spark-pi-driver 4040期待される出力:
Forwarding from 127.0.0.1:4040 -> 4040 Forwarding from [::1]:4040 -> 4040
-
-
Web ブラウザーで http://127.0.0.1:4040 を開いて、Web UI にアクセスします。
(任意)ステップ 5:Spark ジョブの更新
Spark ジョブのパラメーターを変更する必要がある場合は、マニフェストを更新できます。
-
spark-pi.yamlマニフェストを編集します。たとえば、argumentsの値を10000に変更し、executorのインスタンス数を2に変更します。apiVersion: sparkoperator.k8s.io/v1beta2 kind: SparkApplication metadata: name: spark-pi spec: type: Scala mode: cluster image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.4 imagePullPolicy: IfNotPresent mainClass: org.apache.spark.examples.SparkPi mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar arguments: - "10000" sparkVersion: 3.5.4 driver: cores: 1 coreLimit: 1200m memory: 512m serviceAccount: spark-operator-spark # ServiceAccount 名をカスタマイズした場合は、値を適宜変更してください。 executor: instances: 2 cores: 1 coreLimit: 1200m memory: 512m restartPolicy: type: Never -
以下のコマンドを実行して変更を適用します。
kubectl apply -f spark-pi.yaml -
以下のコマンドを実行してジョブのステータスを確認します。
kubectl get sparkapplication spark-piSpark ジョブが再実行されます。期待される出力には、RUNNING ステータスが表示されます。
NAME STATUS ATTEMPTS START FINISH AGE spark-pi RUNNING 1 2024-06-04T03:37:34Z <no value> 20m
(任意)ステップ 6:Spark ジョブの削除
Spark ジョブが不要になった場合、関連するリソースを解放するために削除します。
作成した Spark ジョブを削除します。
kubectl delete -f spark-pi.yaml
または、以下のコマンドを実行することもできます。
kubectl delete sparkapplication spark-pi
関連ドキュメント
-
Spark History Server を使用して Spark ジョブ情報を表示する方法については、「Spark History Server を使用した Spark ジョブ情報の表示」をご参照ください。
-
Log Service を使用して Spark ジョブのログを収集する方法については、「Log Service を使用した Spark ジョブログの収集」をご参照ください。
-
Spark ジョブで Object Storage Service (OSS) のデータを読み書きするための構成方法については、「Spark ジョブにおける OSS データの読み書き」をご参照ください。
-
エラスティックリソースを使用して Spark ジョブを実行する方法については、「ECI エラスティックリソースを用いた Spark ジョブの実行」をご参照ください。
-
Spark ジョブで Celeborn をリモートシャッフルサービス (RSS) として使用する方法については、「Spark ジョブにおける Celeborn の RSS としての使用」をご参照ください。