すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:Spark ジョブのマルチクラスタスケジューリングと分散

最終更新日:Mar 06, 2025

Apache Spark は、大規模データ処理用のコンピューティングエンジンです。 Apache Spark は、ビッグデータコンピューティングと機械学習のシナリオでワークロードを分析するために広く使用されています。 このトピックでは、Kubernetes 用の分散クラウドコンテナプラットフォーム (ACK One) の Fleet インスタンスを使用して、複数のクラスタで Spark ジョブをスケジューリングおよび分散する方法について説明します。 これにより、複数のクラスタでアイドル状態のリソースの活用度を向上させることができます。

しくみ

  1. 関連付けられたクラスタに ack-spark-operator コンポーネントをインストールします。

  2. Fleet インスタンス用に SparkApplicationPropagationPolicy を作成します。

  3. Fleet インスタンスのマルチクラスタスケジューリングコンポーネント (グローバルスケジューラ) は、関連付けられた各サブクラスタの残りのリソースに基づいて、Spark ジョブのリソースリクエストを照合します。

    Kubernetes のバージョンが 1.28 以降のサブクラスタの場合、Fleet インスタンスはリソースのプリエンプションをサポートして、Spark ジョブのスケジューリングの成功率を向上させます。

  4. Fleet インスタンスがジョブをスケジューリングした後、SparkApplication はスケジューリングされ、関連付けられたクラスタに分散されます。

  5. 関連付けられたクラスタでは、ACK Spark Operator が Spark ジョブの driverexecutor を実行します。 同時に、Fleet インスタンスはサブクラスタ内の Spark ジョブの実行ステータスを監視します。 リソース不足のために driver を実行できない場合、Fleet インスタンスは一定期間後に SparkApplication を再要求し、SparkApplication を十分なリソースを持つ他の関連付けられたクラスタに再スケジューリングします。

前提条件

  • Fleet インスタンスは、Kubernetes バージョンが 1.18 以降の複数のクラスタに関連付けられています。 詳細については、「関連付けられたクラスタの管理」をご参照ください。

  • Resource Access Management (RAM) ポリシー AliyunAdcpFullAccess が RAM ユーザーにアタッチされています。 詳細については、「RAM ユーザーへの権限の付与」をご参照ください。

  • AMC コマンドラインツールがインストールされています。 詳細については、「AMC コマンドラインの使用」をご参照ください。

ステップ 1: 関連付けられたクラスタに ack-spark-operator コンポーネントをインストールする

Spark ジョブを実行するサブクラスタに ack-spark-operator コンポーネントをインストールします。

  1. ACK コンソール にログインします。 左側のナビゲーションウィンドウで、[マーケットプレイス] > [マーケットプレイス] を選択します。

  2. [マーケットプレイス] ページで、[アプリカタログ] タブをクリックします。 ack-spark-operator を見つけてクリックします。

  3. [ack-spark-operator] ページで、[デプロイ] をクリックします。

  4. [デプロイ] パネルで、クラスタと名前空間を選択し、[次へ] をクリックします。

  5. [パラメーター] ステップで、パラメーターを構成し、[OK] をクリックします。

    次の表に、いくつかのパラメーターについて説明します。 パラメーターの構成は、[ack-spark-operator] ページの [パラメーター] セクションにあります。

    パラメーター

    説明

    controller.replicas

    コントローラーレプリカの数。

    デフォルト値: 1。

    webhook.replicas

    Webhook レプリカの数。

    デフォルト値: 1。

    spark.jobNamespaces

    Spark ジョブを実行できる名前空間。 このパラメーターを空のままにすると、Spark ジョブはすべての名前空間で実行できます。 複数の名前空間はコンマ (,) で区切ります。

    • デフォルト値: ["default"]

    • [""]: すべての名前空間。

    • ["ns1","ns2","ns3"]: 1 つ以上の名前空間を指定します。

    spark.serviceAccount.name

    Spark ジョブは、spark-operator-spark という名前の ServiceAccount と、対応するロールベースアクセス制御 (RBAC) リソースを、spark.jobNamespaces で指定された各名前空間に自動的に作成します。 ServiceAccount のカスタム名を指定し、Spark ジョブを送信するときにカスタム名を指定できます。

    デフォルト値: spark-operator-spark

ステップ 2: Fleet インスタンスで PriorityClass を作成し、PriorityClass をサブクラスタに分散する

送信された Spark ジョブがオンラインサービスリソースを占有し、オンラインサービスサービスの通常の動作に影響を与えないようにするために、送信された Spark ジョブに低い優先度を指定することをお勧めします。

  1. Fleet インスタンスの kubeconfig ファイルを使用して、低優先度の PriorityClass を作成し、value を負の値に設定します。

    apiVersion: scheduling.k8s.io/v1
    kind: PriorityClass
    metadata:
      name: low-priority
    value: -1000
    globalDefault: false
    description: "Spark アプリケーションの低優先度"
  2. Fleet インスタンスで ClusterPropagationPolicy を作成して、PriorityClass を指定されたクラスタに分散します。 PriorityClass をすべての関連付けられたクラスタに分散する場合は、clusterAffinity パラメーターを削除できます。

    apiVersion: policy.one.alibabacloud.com/v1alpha1
    kind: ClusterPropagationPolicy
    metadata:
      name: priority-policy
    spec:
      preserveResourcesOnDeletion: false
      resourceSelectors:
      - apiVersion: scheduling.k8s.io/v1
        kind: PriorityClass
      placement:
        clusterAffinity:
          clusterNames:
          - ${cluster1-id} # クラスタの ID。
          - ${cluster2-id} # クラスタの ID。
    #      labelSelector:
    #        matchLabels:
    #          key: value
        replicaScheduling:
          replicaSchedulingType: Duplicated

ステップ 3: Fleet インスタンスで Spark ジョブを送信し、Spark ジョブをサブクラスタにスケジューリングする

  1. (オプション) Fleet インスタンスで名前空間を作成し、名前空間をサブクラスタに分散します。

    1. 分散するアプリケーションが配置されている名前空間が Fleet インスタンスに存在しない場合は、最初に Fleet インスタンスに名前空間を作成し、名前空間が ステップ 1 でインストールされたコンポーネントの spark.jobNamespaces パラメーターに含まれていることを確認する必要があります。 名前空間がすでに存在する場合は、このステップをスキップできます。

      Fleet インスタンスの kubeconfig ファイルを使用して、次のコマンドを実行して名前空間を作成します。

      kubectl create ns xxx
    2. サブクラスタにも対応する名前空間が必要です。 名前空間が存在しない場合は、ClusterPropagationPolicy を使用して、Fleet インスタンスの名前空間を各サブクラスタに分散できます。

      apiVersion: policy.one.alibabacloud.com/v1alpha1
      kind: ClusterPropagationPolicy
      metadata:
        name: ns-policy
      spec:
        resourceSelectors:
        - apiVersion: v1
          kind: Namespace
          name: xxx
        placement:
          clusterAffinity:
            clusterNames:
            - ${cluster1-id} # クラスタの ID。
            - ${cluster2-id} # クラスタの ID。
          replicaScheduling:
            replicaSchedulingType: Duplicated
  2. Fleet インスタンスで次の PropagationPolicy を作成して、sparkoperator. Kubernetes. io/v1beta2 のすべての SparkApplication リソースを対応するクラスタに分散します。

    apiVersion: policy.one.alibabacloud.com/v1alpha1
    kind: PropagationPolicy
    metadata:
      name: sparkapp-policy 
      namespace: default
    spec:
      preserveResourcesOnDeletion: false
      propagateDeps: true
      placement:
        clusterAffinity:
          clusterNames:
          - ${cluster1-id} # クラスタの ID。
          - ${cluster2-id} # クラスタの ID。
    #      labelSelector:
    #        matchLabels:
    #          key: value
        replicaScheduling:
          replicaSchedulingType: Divided
          customSchedulingType: Gang
      resourceSelectors:
        - apiVersion: sparkoperator.k8s.io/v1beta2
          kind: SparkApplication
  3. Fleet インスタンスで SparkApplication を作成し、driverexecutorpriorityClassName パラメーターを構成します。 作成後、アプリケーションは ステップ 2PropagationPolicy によって選択されたクラスタに分散されます。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: default     # 名前空間が spark.jobNamespaces パラメーターで指定された名前空間リストに含まれていることを確認します。
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.4
      imagePullPolicy: IfNotPresent
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.4.jar
      arguments:
      - "1000"
      sparkVersion: 3.5.4
      driver:
        cores: 1
        memory: 512m
        priorityClassName: low-priority
        serviceAccount: spark-operator-spark   # spark-operator-spark を指定したカスタム名に置き換えます。
      executor:
        instances: 1
        cores: 1
        memory: 512m
        priorityClassName: low-priority
      restartPolicy:
        type: Never

ステップ 4: Spark ジョブを表示する

  1. Fleet インスタンスで次のコマンドを実行して、Spark ジョブのステータスを表示します。

    kubectl get sparkapp

    予期される出力:

    NAME       STATUS    ATTEMPTS   START                  FINISH       AGE
    spark-pi   RUNNING   1          2025-02-24T12:10:34Z   <no value>   11s
  2. Fleet インスタンスで次のコマンドを実行して、Spark ジョブがどの関連付けられたクラスタにスケジューリングされているかを確認します。

    kubectl describe sparkapp spark-pi

    予期される出力:

     Normal   ScheduleBindingSucceed  2m29s                  default-scheduler                   バインディングは正常にスケジューリングされました。 結果: {c6xxxxx:0,[{driver 1} {executor 1}]}
  3. Fleet インスタンスで次のコマンドを実行して、関連付けられたクラスタ内の Spark ジョブのステータスを確認します。

    kubectl amc get sparkapp -M

    予期される出力:

    NAME       CLUSTER     STATUS      ATTEMPTS   START                  FINISH                 AGE   ADOPTION
    spark-pi   c6xxxxxxx   COMPLETED   1          2025-02-24T12:10:34Z   2025-02-24T12:11:20Z   61s   Y
  4. Fleet インスタンスで次のコマンドを実行して、ポッドのステータスをクエリします。

    kubectl amc get pod -M     

    予期される出力:

    NAME              CLUSTER     READY   STATUS      RESTARTS   AGE
    spark-pi-driver   c6xxxxxxx   0/1     Completed   0          68s
  5. Fleet インスタンスで次のコマンドを実行して、関連付けられたクラスタ内の Spark ジョブの詳細を表示します。

    kubectl amc get sparkapp spark-pi -m ${member clusterid} -oyaml