すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:ACK 上の Spark の概要

最終更新日:Apr 09, 2025

Container Service for Kubernetes (ACK) 上の Spark は、ACK が提供するソリューションです。Kubernetes 上の Spark を活用し、ACK が提供するエンタープライズレベルのコンテナアプリケーション管理機能を利用して、効率的で柔軟性があり、スケーラブルな Spark ビッグデータ処理プラットフォームを迅速に構築できます。

ACK 上の Spark の紹介

Apache Spark は、大規模データ処理向けに設計された計算エンジンであり、データ分析や機械学習などのシナリオで広く利用されています。バージョン 2.3 以降、Spark は Kubernetes クラスタへのジョブ送信を可能にしました (Kubernetes 上での Spark の実行)。

Spark Operator は、Kubernetes クラスタ上で Spark ワークロードを実行するために設計された Operator です。構成、送信、再試行プロセスなど、Kubernetes ネイティブの方法で Spark ジョブライフサイクルの管理を自動化します。

ACK 上の Spark ソリューションは、Spark Operator のようなコンポーネントをカスタマイズおよび強化し、オープンソースバージョンとの互換性を確保し、機能を拡張します。Alibaba Cloud エコシステムとシームレスに統合し、ログ保持、Object Storage Service、可観測性などの機能を提供することで、柔軟で効率的、かつスケーラブルなビッグデータ処理プラットフォームを迅速に構築できます。

機能と利点

  • 開発と運用の簡素化

    • 移植性: Spark アプリケーションとその依存関係をコンテナイメージにパッケージ化し、Kubernetes クラスタ間で簡単に移行できます。

    • 可観測性: Spark History Server を介したジョブステータスの監視を可能にし、Simple Log Service および Managed Service for Prometheus と統合してジョブの可観測性を向上させます。

    • ワークフローオーケストレーション: Apache AirflowArgo Workflows などのワークフローオーケストレーションエンジンを使用して Spark ジョブを管理し、データパイプラインのスケジューリングを自動化し、さまざまな環境で一貫したデプロイを保証します。これにより、運用効率が向上し、移行コストが削減されます。

    • マルチバージョンサポート: 単一の ACK クラスタで複数のバージョンの Spark ジョブを同時に実行できます。

  • ジョブスケジューリングとリソース管理

    • ジョブキュー管理: ack-kube-queue とシームレスに統合されたこの機能は、ジョブキューとリソースクォータの柔軟な管理を提供し、ワークロードのリソース割り当てを自動的に最適化し、クラスタリソースの利用率を向上させます。

    • 複数のスケジューリング戦略: ACK スケジューラの既存の スケジューリング機能 を活用して、Gang Scheduling や Capacity Scheduling など、さまざまなバッチスケジューリング戦略をサポートします。

    • マルチアーキテクチャスケジューリング: x86 および Arm アーキテクチャの Elastic Compute Service (ECS) リソースのハイブリッド使用をサポートし、効率を向上させ、コストを削減します。

    • マルチクラスタスケジューリング: ACK One マルチクラスタフリートを利用して、Spark ジョブをさまざまなクラスタに分散 し、複数のクラスタにわたるリソース利用率を向上させます。

    • 弾力的な計算能力の供給: カスタマイズ可能なリソース優先度スケジューリングと、ノードの自動スケーリングインスタントエラステイシティ などのさまざまな弾力性ソリューションの統合を提供します。また、Elastic Compute Service インスタンスを維持することなく、Elastic Container Instance および Alibaba Cloud Container Compute Service (ACS) の計算リソースを使用できるため、オンデマンドで柔軟なスケーリングオプションが可能になります。

    • 複数タイプのワークロードのコロケーション: ack-koordinator とシームレスに統合されたこの機能は、さまざまなタイプのワークロードのコロケーションをサポートし、それによってクラスタリソースの利用率を向上させます。

  • パフォーマンスと安定性の最適化

    • シャッフルパフォーマンスの最適化: Spark ジョブが Celeborn を Remote Shuffle Service として使用するように構成し、ストレージと計算の分離を実現し、シャッフルのパフォーマンスと安定性を向上させます。

    • データアクセス高速化: Fluid が提供するデータオーケストレーションとアクセス高速化機能を利用して、Spark ジョブのデータアクセスを高速化し、パフォーマンスを向上させます。

全体アーキテクチャ

ACK 上の Spark のアーキテクチャにより、Spark Operator を介してジョブを迅速に送信でき、ACK と Alibaba Cloud プロダクトの可観測性、スケジューリング、リソースの弾力性機能を活用できます。

  • クライアント: kubectl や Arena などのコマンドラインツールを使用して、Spark ジョブを ACK クラスタに送信します。

  • ワークフロー: Apache Airflow や Argo Workflows などのフレームワークを使用して、Spark ジョブをオーケストレーションし、ACK クラスタに送信します。

  • 可観測性: Spark History Server、Simple Log ServiceManaged Service for Prometheus を使用して、システムの可観測性を向上させます。これには、ジョブステータスの監視、ジョブログとメトリックの収集と分析が含まれます。

  • Spark Operator: 構成、送信、再試行など、Spark ジョブライフサイクル管理を自動化します。

  • Remote Shuffle Service (RSS): Apache Celeborn を RSS として使用して、Spark ジョブのパフォーマンスと安定性を向上させます。

  • キャッシュ: Fluid をデータアクセスと高速化のための分散キャッシュシステムとして採用します。

  • クラウドインフラストラクチャ: ジョブの実行中に、ECS インスタンスElastic Container InstanceACS クラスタ などの計算リソース、ディスクNAS ファイルシステム (NAS)Object Storage Service (OSS) バケット などのストレージリソース、Elastic Network Interface (ENI)Virtual Private Cloud (VPC)Server Load Balancer (SLB) インスタンス などのネットワークリソースを含む、Alibaba Cloud インフラストラクチャを利用します。

課金概要

ACK クラスタで Spark ジョブを実行するためのコンポーネントのインストールは無料です。ただし、クラスタ管理料金や関連クラウドプロダクト料金など、ACK クラスタ自体の費用は通常どおり課金されます。詳細については、「課金概要」をご参照ください。

Simple Log Service でログを収集したり、Spark ジョブで OSS/NAS のデータを読み書きしたりするための追加のクラウドプロダクト料金は、各クラウドプロダクトによって課金されます。詳細については、以下の操作ドキュメントを参照してください。

はじめに

ACK クラスタで Spark ジョブを実行するには、通常、基本的な使用方法、可観測性、詳細設定など、一連のステップが含まれます。ニーズに応じて選択および構成できます。

基本的な使用方法

プロセス

説明

Spark コンテナイメージの構築

オープンソースコミュニティが提供する Spark コンテナイメージを直接使用するか、オープンソースコンテナイメージに基づいてカスタマイズし、独自のイメージリポジトリにプッシュすることができます。以下は Dockerfile の例です。Spark ベースイメージの置き換えや依存 JAR パッケージの追加など、必要に応じてこの Dockerfile を変更し、イメージをビルドしてイメージリポジトリにプッシュできます。

展開して Dockerfile の例を表示

ARG SPARK_IMAGE=spark:3.5.4

FROM ${SPARK_IMAGE}

# Hadoop Aliyun OSS サポートの依存関係を追加します
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

# log4j-layout-template-json の依存関係を追加します
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/logging/log4j/log4j-layout-template-json/2.24.1/log4j-layout-template-json-2.24.1.jar ${SPARK_HOME}/jars

# Celeborn の依存関係を追加します
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.3/celeborn-client-spark-3-shaded_2.12-0.5.3.jar ${SPARK_HOME}/jars

専用の namespace の作成

Spark ジョブ用に 1 つ以上の専用の namespace (このチュートリアルでは spark を使用) を作成して、リソースの分離とリソースクォータを実現します。後続の Spark ジョブはこの namespace で実行されます。作成コマンドは次のとおりです。

kubectl create namespace spark

Spark Operator を使用した Spark ジョブの実行

ack-spark-operator コンポーネントをデプロイし、spark.jobNamespaces=["spark"] を構成します (spark namespace に送信された Spark ジョブのみをリッスンします)。デプロイ後、次の Spark ジョブの例を実行できます。

展開して Spark ジョブの例を表示

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark # この namespace が spark.jobNamespaces で指定された namespace リストに含まれていることを確認します。
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を独自の Spark コンテナイメージに置き換えます。
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
        serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
詳細については、「Spark Operator を使用して Spark ジョブを実行する」をご参照ください。

OSS データの読み取りと書き込み

Spark ジョブが Alibaba Cloud OSS データにアクセスするには、Hadoop Aliyun SDKHadoop AWS SDKJindoSDK など、複数の方法があります。選択した SDK に応じて、Spark コンテナイメージに該当する依存関係を含め、Spark ジョブで Hadoop 関連のパラメータを構成する必要があります。

展開してコード例を表示

Spark ジョブで OSS データを読み書きする」を参照して、テストデータセットを OSS にアップロードします。その後、次の Spark ジョブの例を実行できます。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を独自の Spark イメージに置き換えます
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 入力テストデータセットを指定します。<OSS_BUCKET> を OSS バケット名に置き換えます
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 反復回数
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # OSS への Spark ジョブアクセスを構成します
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # <OSS_ENDPOINT> を OSS アクセスエンドポイントに置き換えます。たとえば、北京の OSS の内部ネットワークアクセスエンドポイントは oss-cn-beijing-internal.aliyuncs.com です
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 指定された Secret から環境変数を読み取ります
          - secretRef:
              name: spark-oss-secret
        serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 指定された Secret から環境変数を読み取ります
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
詳細については、「Spark ジョブで OSS データを読み書きする」をご参照ください。

可観測性

プロセス

説明

Spark History Server のデプロイ

spark namespace に ack-spark-history-server をデプロイし、ログストレージバックエンド (PVC、OSS/OSS-HDFS、HDFS をサポート) とその他の情報を構成して、指定されたストレージシステムから Spark イベントログを読み取り、ユーザーが表示できる Web UI に解析します。次の構成例は、指定された NAS ファイルシステムの /spark/event-logs パスからイベントログを読み取るように Spark History Server を構成する方法を示しています。

展開して構成例を表示

# Spark 構成
sparkConf:
  spark.history.fs.logDirectory: file:///mnt/nas/spark/event-logs

# 環境変数
env:
- name: SPARK_DAEMON_MEMORY
  value: 7g

# データボリューム
volumes:
- name: nas
  persistentVolumeClaim:
    claimName: nas-pvc

# データボリュームマウント
volumeMounts:
- name: nas
  subPath: spark/event-logs
  mountPath: /mnt/nas/spark/event-logs

# Spark ジョブの数と規模に基づいてリソースサイズを調整します
resources:
  requests:
    cpu: 2
    memory: 8Gi
  limits:
    cpu: 2
    memory: 8Gi

次に、Spark ジョブを送信するときに同じ NAS ファイルシステムをマウントし、同じパスにイベントログを書き込むように Spark を構成します。その後、Spark History Server からジョブを表示できるようになります。以下はジョブの例です。

展開して Spark ジョブの例を表示

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を Spark イメージに置き換えます
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  sparkConf:
    # イベントログ
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
詳細については、「Spark History Server を使用して Spark ジョブに関する情報を表示する」をご参照ください。

Spark ログを収集するための Simple Log Service の構成

クラスタで多数の Spark ジョブを実行する場合、Simple Log Service を使用してすべての Spark ジョブログを一元的に収集し、Spark コンテナの stdout および stderr ログをクエリおよび分析することをお勧めします。

展開してジョブの例を表示

このコードは、Spark ジョブで Simple Log Service を使用して、Spark コンテナの /opt/spark/logs/*.log にあるログを収集します。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を手順 1 でビルドした Spark イメージに置き換えます
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  - "5000"
  sparkVersion: 3.5.4
  # 指定された ConfigMap からログ構成ファイル log4j2.properties を読み取ります
  sparkConfigMap: spark-log-conf
  sparkConf:
    # イベントログ
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        serviceAccount: spark-operator-spark
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
  executor:
    instances: 1
    cores: 1
    coreLimit: 1200m
    memory: 512m
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
  restartPolicy:
    type: Never
詳細については、「Simple Log Service を使用して Spark ジョブのログを収集する」をご参照ください。

パフォーマンスの最適化

プロセス

説明

RSS によるシャッフルパフォーマンスの向上

シャッフルは分散コンピューティングにおける重要な操作であり、多くの場合、大量のディスク IO、データシリアル化、ネットワーク IO を伴い、OOM やデータ取得の失敗 (Fetch failures) を引き起こしやすいです。シャッフルのパフォーマンスと安定性を最適化し、コンピューティングサービスの品質を向上させるために、Spark ジョブ構成で Apache Celeborn を Remote Shuffle Service (RSS) として使用できます。

展開してコード例を表示

Celeborn を使用して Spark ジョブの RSS を有効にする」を参照して、クラスタに ack-celeborn コンポーネントをデプロイします。その後、以下のコードに基づいて Spark ジョブを送信し、Celeborn を RSS として使用できます。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を Spark イメージに置き換えます
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 入力テストデータセットを指定します。<OSS_BUCKET> を OSS バケット名に置き換えます
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 反復回数
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # OSS への Spark ジョブアクセスを構成します
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # <OSS_ENDPOINT> を OSS アクセスエンドポイントに置き換えます。たとえば、北京の OSS の内部ネットワークアクセスエンドポイントは oss-cn-beijing-internal.aliyuncs.com です
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  # 指定された ConfigMap からログ構成ファイル log4j2.properties を読み取ります
  sparkConfigMap: spark-log-conf
  sparkConf:
    # イベントログ
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs
    
    # Celeborn 関連の構成
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    # Celeborn マスターレプリカの数に基づいて構成します。
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    spark.celeborn.client.spark.shuffle.writer: hash
    spark.celeborn.client.push.replicate.enabled: "false"
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    spark.executor.userClassPathFirst: "false"
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 指定された Secret から環境変数を読み取ります
          - secretRef:
              name: spark-oss-secret
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 指定された Secret から環境変数を読み取ります
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
詳細については、「Spark ジョブで Celeborn を RSS として使用する」をご参照ください。

弾力的なリソーススケジューリング優先度の定義

Elastic Container Instance ベースの Pod を使用し、適切なスケジューリング戦略を構成することで、オンデマンドで作成し、実際の使用量に基づいて課金できるため、アイドル状態のクラスタリソースによるコストの無駄を効果的に削減できます。ECS インスタンスと Elastic Container Instance が混在するシナリオでは、スケジューリングの優先順位を指定することもできます。

SparkApplication でスケジューリング関連の構成を変更する必要はありません。ACK スケジューラは、構成された弾力性戦略に基づいて Pod スケジューリングを自動的に完了します。必要に応じて、さまざまな弾力性リソース (ECS インスタンスや Elastic Container Instance など) の混在使用を柔軟にカスタマイズできます。

展開して弾力性戦略の例を表示

次の例では、弾力性戦略をカスタマイズします。spark namespace の Spark Operator によって起動された Pod の場合、ECS リソースが優先され、最大 10 個の Pod を ECS インスタンスにスケジュールできます。ECS リソースが不足している場合は、Elastic Container Instance が使用され、最大 10 個の Pod を Elastic Container Instance にスケジュールできます。

apiVersion: scheduling.alibabacloud.com/v1alpha1
kind: ResourcePolicy
metadata:
  name: spark
  namespace: spark
spec:
  # ラベルセレクタを介して Pod に適用されるスケジューリング戦略を指定します
  selector:
    # たとえば、Spark Operator を介して送信された Pod に適用されるスケジューリング戦略を指定します
    sparkoperator.k8s.io/launched-by-spark-operator: "true"
  strategy: prefer
  # スケジューリングユニットの構成
  # スケールアウト中は、スケジューリングユニットの順序でスケールアウトが実行されます。スケールイン中は、スケジューリングユニットの逆順でスケールインが実行されます。
  units:
  # 最初のスケジューリングユニットは ECS リソースを使用し、最大 10 個の Pod を ECS インスタンスにスケジュールできます
  - resource: ecs
    max: 10
    # スケジューラはラベル情報を Pod に更新します
    podLabels:
      # これは Pod に更新されない特別なラベルです
      k8s.aliyun.com/resource-policy-wait-for-ecs-scaling: "true"
    # ECS リソースを使用する場合、ノードセレクタを介してスケジュール可能なノードを指定できます
    nodeSelector:
      # たとえば、従量課金タイプの ECS ノードを選択します
      node.alibabacloud.com/instance-charge-type: PostPaid
  # 2 番目のスケジューリングユニットは ECI リソースを使用し、最大 10 個の Pod を ECI インスタンスにスケジュールできます
  - resource: eci
    max: 10
  # Pod の数をカウントするときに、ResourcePolicy が作成される前にすでにスケジュールされていた Pod を無視します
  ignorePreviousPod: false
  # Pod の数をカウントするときに、Terminating 状態の Pod を無視します
  ignoreTerminatingPod: true
  # プリエンプション戦略
  # BeforeNextUnit は、各ユニットのスケジューリングが失敗した場合にスケジューラがプリエンプションを試みることを示します
  # AfterAllUnits は、最後のユニットのスケジューリングが失敗した場合にのみ ResourcePolicy がプリエンプションを試みることを示します
  preemptPolicy: AfterAllUnits
  # どのような状況で Pod が後続のユニットのリソースを使用できるかを指定します
  whenTryNextUnits:
    # 次の 2 つの条件のいずれかが満たされた場合、後続のユニットのリソースの使用が許可されます
    # 1. 現在のユニットの Max が設定されており、そのユニットの Pod の数が設定された Max 値以上である。
    # 2. 現在のユニットの Max が設定されておらず、そのユニットの PodLabels にラベル k8s.aliyun.com/resource-policy-wait-for-ecs-scaling: "true" が含まれており、タイムアウトまで待機した後。
    policy: TimeoutOrExceedMax
    # ポリシーが TimeoutOrExceedMax の場合、現在のユニットのリソースが Pod のスケジュールに不十分な場合、現在のユニットで待機し、最大待機時間はタイムアウトです。
    # この戦略は、自動スケーリングおよび ECI と組み合わせて使用​​して、ノードプール自動スケーリングを優先し、タイムアウト後に自動的に ECI を使用する効果を実現できます。
    timeout: 30s
詳細については、「Elastic Container Instance を使用して Spark ジョブを実行する」をご参照ください。

動的リソース割り当ての構成

動的リソース割り当て (DRA) は、ワークロードのサイズに基づいてジョブが使用する計算リソースを動的に調整できます。Spark ジョブの動的リソース割り当てを有効にすることで、リソース不足によるジョブの実行時間の長期化や、リソース過剰によるリソースの無駄を回避できます。

展開してジョブの例を表示

このジョブの例では、Celeborn RSS と組み合わせて動的リソース割り当てをさらに構成します。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: spark
spec:
  type: Scala
  mode: cluster
  # <SPARK_IMAGE> を Spark イメージに置き換えます
  image: <SPARK_IMAGE>
  imagePullPolicy: IfNotPresent
  mainClass: org.apache.spark.examples.SparkPageRank
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
  arguments:
  # 入力テストデータセットを指定します。<OSS_BUCKET> を OSS バケット名に置き換えます
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt
  # 反復回数
  - "10"
  sparkVersion: 3.5.4
  hadoopConf:
    # OSS への Spark ジョブアクセスを構成します
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    # <OSS_ENDPOINT> を OSS アクセスエンドポイントに置き換えます。たとえば、北京の OSS の内部ネットワークアクセスエンドポイントは oss-cn-beijing-internal.aliyuncs.com です
    fs.oss.endpoint: <OSS_ENDPOINT>
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  # 指定された ConfigMap からログ構成ファイル log4j2.properties を読み取ります
  sparkConfigMap: spark-log-conf
  sparkConf:
    # ====================
    # イベントログ
    # ====================
    spark.eventLog.enabled: "true"
    spark.eventLog.dir: file:///mnt/nas/spark/event-logs

    # ====================
    # Celeborn
    # 参照: https://github.com/apache/celeborn/blob/main/README.md#spark-configuration
    # ====================
    # シャッフルマネージャクラス名は 0.3.0 で変更されました。
    #    0.3.0 より前: `org.apache.spark.shuffle.celeborn.RssShuffleManager`
    #    0.3.0 以降: `org.apache.spark.shuffle.celeborn.SparkShuffleManager`
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    # Java シリアライザは再配置をサポートしていないため、kryo シリアライザを使用する必要があります
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    # Celeborn マスターレプリカの数に基づいて構成します。
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    # オプション: hash, sort
    # ハッシュシャッフルライターは、(パーティション数) * (celeborn.push.buffer.max.size) * (spark.executor.cores) メモリを使用します。
    # シャッフルパーティション数が大きい場合は、ソートハッシュライターを使用してみてください。ソートシャッフルライターは、ハッシュシャッフルライターよりも少ないメモリを使用します。
    spark.celeborn.client.spark.shuffle.writer: hash
    # サーバー側のデータレプリケーションを有効にするには、`spark.celeborn.client.push.replicate.enabled` を true に設定することをお勧めします
    # ワーカーが 1 つしかない場合は、この設定を false にする必要があります
    # Celeborn が HDFS を使用している場合は、この設定を false に設定することをお勧めします
    spark.celeborn.client.push.replicate.enabled: "false"
    # Spark AQE のサポートは、Spark 3 でのみテストされています
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    # パフォーマンスを向上させるために、aqe サポートを有効にすることをお勧めします
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    # Spark バージョンが 3.5.0 以降の場合は、動的リソース割り当てをサポートするためにこのパラメータを構成します
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.executor.userClassPathFirst: "false"

    # ====================
    # 動的リソース割り当て
    # 参照: https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
    # ====================
    # 動的リソース割り当てを有効にします
    spark.dynamicAllocation.enabled: "true"
    # シャッフルされたファイルの追跡を有効にし、ESS に依存せずに動的リソース割り当てを実装します。
    # Spark バージョンが 3.4.0 以降の場合は、Celeborn を RSS として使用する場合にこの機能を無効にすることをお勧めします。
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    # executor 数の初期値。
    spark.dynamicAllocation.initialExecutors: "3"
    # executor の最小数。
    spark.dynamicAllocation.minExecutors: "0"
    # executor の最大数。
    spark.dynamicAllocation.maxExecutors: "10"
    # executor のアイドルタイムアウト期間。タイムアウト期間を超えると、executor は解放されます。
    spark.dynamicAllocation.executorIdleTimeout: 60s
    # データブロックをキャッシュした executor のアイドルタイムアウト期間。タイムアウト期間を超えると、executor は解放されます。デフォルト値は無限大で、データブロックが解放されないことを指定します。
    # spark.dynamicAllocation.cachedExecutorIdleTimeout:
    # ジョブが指定されたスケジュール時間を超えると、追加の executor がリクエストされます。
    spark.dynamicAllocation.schedulerBacklogTimeout: 1s
    # 各時間間隔の後、後続のバッチの executor がリクエストされます。
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout: 1s
  driver:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-driver
          envFrom:
          # 指定された Secret から環境変数を読み取ります
          - secretRef:
              name: spark-oss-secret
          volumeMounts:
          - name: nas
            subPath: spark/event-logs
            mountPath: /mnt/nas/spark/event-logs
        volumes:
        - name: nas
          persistentVolumeClaim:
            claimName: nas-pvc
        serviceAccount: spark-operator-spark
  executor:
    cores: 1
    coreLimit: "1"
    memory: 4g
    template:
      spec:
        containers:
        - name: spark-kubernetes-executor
          envFrom:
          # 指定された Secret から環境変数を読み取ります
          - secretRef:
              name: spark-oss-secret
  restartPolicy:
    type: Never
詳細については、「Spark ジョブの動的リソース割り当てを構成する」をご参照ください。

Fluid を使用したデータアクセスの高速化

データがデータセンターにある場合、またはデータアクセス中にパフォーマンスボトルネックが発生した場合、Fluid が提供するデータアクセスと分散キャッシュオーケストレーション機能を使用してデータアクセスを高速化できます。

詳細については、「Fluid を使用して Spark アプリケーションのデータアクセスを高速化する」をご参照ください。

参照