すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:elasticコンテナーインスタンスを使用したSparkジョブの実行

最終更新日:Nov 12, 2024

このトピックでは、エラスティックコンテナインスタンスを使用して、container Service for Kubernetes (ACK) クラスターでSparkジョブを実行する方法について説明します。 スケジューリングポリシーを設定して、ポッドをエラスティックコンテナインスタンスにスケジュールすることができます。 このようにして、Elastic Container Instanceベースのポッドを作成し、ポッドによって使用されるリソースに対してのみ支払うことができます。 これにより、アイドルリソースが削減され、予期しないコストが発生します。 さらに、Sparkジョブの費用対効果と効率が向上します。

前提条件

利点

Sparkジョブのドライバポッドとエグゼキュータポッドを、ACKクラスター内のエラスティックコンテナインスタンスに動的にスケジュールできます。 これにより、ジョブはサーバーレスモードで実行できます。 各エラスティックコンテナインスタンスの基盤となるコンピューティングリソースは、軽量仮想サンドボックスを使用して分離されます。 Elasticコンテナインスタンスは相互に影響を与えません。

Elasticコンテナインスタンスは、Sparkジョブに次の利点をもたらします。

  • 超大規模: 構成を追加したり、クラスターのサイズを計画したりすることなく、ACKサーバーレスクラスターに50,000を超えるポッドを作成できます。

  • 第2レベルのスケーリング: トラフィックの急増を処理するために、短時間で数千のポッドを作成できます。

  • 費用対効果: エラスティックコンテナインスタンスに対しては従量課金制で課金されるため、予期しないコストが発生するのを防ぎます。 さらに、複数のインスタンスタイプを設定して、プリエンプティブル弾性コンテナインスタンスを使用できます。 これにより、コンピューティングリソースのコストがさらに削減されます。

ECSインスタンスとエラスティックコンテナインスタンスに基づくリソーススケジューリングの設定

テイント許容範囲、およびノードアフィニティルールを使用して、Elastic Compute Service (ECS) インスタンスとelasticコンテナインスタンスに基づいてリソーススケジューリングを設定できます。 たとえば、上記の設定を使用して、スケジューラがECSインスタンスまたはエラスティックコンテナインスタンスのみを使用できるようにしたり、ECSインスタンスが不足している場合にスケジューラがエラスティックコンテナインスタンスを自動的に申請できるようにしたりできます。 詳細については、「ECSインスタンスとエラスティックコンテナインスタンスに基づくリソース割り当ての設定」をご参照ください。 例:

ECSインスタンスのみ使用

デフォルトでは、virtual-kubelet.io/provider=alibabacloud:NoScheduleテイントがACKクラスター内の各エラスティックコンテナインスタンスに追加されます。 したがって、スケジューラはデフォルトで弾性コンテナインスタンスを使用しません。 次のコードブロックは、ECSインスタンスにのみスケジュールされるSparkApplicationを作成するために使用されます。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-ecs-only
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 2
    memory: 4g

弾性コンテナインスタンスのみを使用する

特定の許容範囲をSparkジョブに追加して、一致するテイントを持つエラスティックコンテナインスタンスにスケジューラがジョブをスケジュールできるようにすることができます。 スケジューラがエラスティックコンテナインスタンスのみにジョブをスケジュールできるようにするには、ジョブにノードアフィニティルールを追加する必要があります。 エラスティックコンテナインスタンスのvirtual-kubelet.io/provider=alibabacloud:NoScheduleのテイントを許容する許容範囲を追加します。 このようにして、スケジューラはSparkジョブをエラスティックコンテナインスタンスにのみスケジュールします。 次のコードブロックは、エラスティックコンテナインスタンスにのみスケジュールされるSparkApplicationを作成するために使用されます。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-eci-only
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    affinity:
      nodeAffinity:
        # Add an affinity rule to allow the job to be scheduled to elastic container instances.
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
          - matchExpressions:
            - key: type
              operator: In
              values:
              - virtual-kubelet
    tolerations:
    # Add a toleration to tolerate a specific taint on elastic container instances.
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule
  executor:
    instances: 2
    cores: 2
    memory: 4g
    affinity:
      nodeAffinity:
        # Add an affinity rule to allow the job to be scheduled to elastic container instances. 
        requiredDuringSchedulingIgnoredDuringExecution:
          nodeSelectorTerms:
          - matchExpressions:
            - key: type
              operator: In
              values:
              - virtual-kubelet
    tolerations:
    # Add a toleration to tolerate a specific taint on elastic container instances. 
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule

ECSインスタンスが不足している場合のelastic containerインスタンスの申請

一般的なスケジューリングソリューションは、ECSインスタンスが不足している場合にスケジューラがエラスティックコンテナインスタンスを申請できるようにすることです。 これにより、リソースの無駄を防ぎます。 さらに、このソリューションでは、ピーク時のトラフィックの急増を処理するようにアプリケーションをスケーリングできます。 次のコードブロックは、ECSインスタンスが不足している場合にエラスティックコンテナインスタンスにスケジュールできるSparkApplicationを作成するために使用されます。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi-ecs-first
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    affinity:
      nodeAffinity:
        # Add an affinity rule to enable the scheduler to apply for elastic container instances when ECS instances are insufficient. 
        preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 1
          preference:
            matchExpressions:
            - key: type
              operator: NotIn
              values:
              - virtual-kubelet
    tolerations:
    # Add a toleration to tolerate a specific taint on elastic container instances. 
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule
  executor:
    instances: 2
    cores: 2
    memory: 4g
    affinity:
      nodeAffinity:
        # Add an affinity rule to enable the scheduler to apply for elastic container instances when ECS instances are insufficient.
        preferredDuringSchedulingIgnoredDuringExecution:
        - weight: 1
          preference:
            matchExpressions:
            - key: type
              operator: NotIn
              values:
              - virtual-kubelet
    tolerations:
    # Add a toleration to tolerate a specific taint on elastic container instances.
    - key: virtual-kubelet.io/provider
      operator: Equal
      value: alibabacloud
      effect: NoSchedule

優先度ベースのリソーススケジューリングの設定

ACKスケジューラは、優先度ベースのリソーススケジューリングをサポートする。 ResourcePolicyを使用して、ポッドスケジューリングのためにさまざまなタイプのノードの優先順位を指定できます。 詳細については、「優先度ベースのリソーススケジューリングの設定」をご参照ください。

  1. resourcepolicy.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。 このファイルは、SparkジョブのポッドをスケジュールするためのECSインスタンスとエラスティックコンテナインスタンスの優先順位を指定するResourcePolicyを作成するために使用されます。 次のコードブロックは、resourcepolicy.yamlファイルの例です。

    apiVersion: scheduling.alibabacloud.com/v1alpha1
    kind: ResourcePolicy
    metadata:
      name: sparkapplication-resource-policy
      namespace: default                      # The ResourcePolicy takes effect only on pods in the default namespace. 
    spec:
      ignorePreviousPod: true     
      ignoreTerminatingPod: false     
      matchLabelKeys:
      - sparkoperator.k8s.io/submission-id    # Count pods based on the submission ID of the Spark job. 
      preemptPolicy: AfterAllUnits            # The preemption policy. It specifies that the scheduler attempts to preempt nodes only after it fails to schedule pods to all schedulable units contained in the ResourcePolicy. 
      selector:                               # The label used to select pods on which the ResourcePolicy takes effect. 
        sparkoperator.k8s.io/launched-by-spark-operator: "true"  # The ResourcePolicy takes effect only on pods launched by Spark Operator. 
      strategy: prefer              
      units:                                 # The ResourcePolicy contains two schedulable units. During a scale-out activity, pods are scheduled to nodes based on the priorities of the listed schedulable units in descending order. During a scale-in activity, pods are deleted from the nodes based on the priorities of the listed schedulable units in ascending order. 
      - max: 2                               # A maximum of two pods can be scheduled to the current unit. The current unit includes ECS nodes that have the kubernetes.io/arch=amd64 label. 
        resource: ecs               
        nodeSelector:
          kubernetes.io/arch: amd64  
      - max: 3                               # A maximum of three pods can be scheduled to the current unit. The current unit includes elastic container instances. 
        resource: eci      
  2. 次のコマンドを実行して、Sparkジョブにのみ有効なResourcePolicyを作成します。

    kubectl apply -f resourcepolicy.yaml
  3. spark-pi. YAMLという名前のSparkApplication yamlファイルを作成します。

    SparkApplicationは、1つのドライバーポッドと5つのエグゼキューターポッドをプロビジョニングします。 リソースが十分な場合、ドライバポッドと1つのエグゼキュータポッドはAMD64ノードにスケジュールされ、3つのエグゼキュータポッドはエラスティックコンテナインスタンスにスケジュールされます。 スケジュール可能なユニットにスケジュールされたポッドの数が上限に達したため、1つのエグゼキュータポッドはPending状態のままです。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi
      namespace: default
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments: 
      - "5000"
      sparkVersion: 3.5.2
      driver:
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark
        tolerations:
        - key: virtual-kubelet.io/provider       # Add a toleration to tolerate a specific taint on elastic container instances. 
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
      executor:
        instances: 5
        cores: 1
        coreLimit: 1200m
        memory: 512m
        tolerations: 
        - key: virtual-kubelet.io/provider         # Add a toleration to tolerate a specific taint on elastic container instances. 
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
  4. 次のコマンドを実行して、Sparkジョブを送信します。

    kubectl apply -f spark-pi.yaml
  5. 次のコマンドを実行して、ジョブ用に作成されたポッドのスケジューリング結果を確認します。

    kubectl get pods  -o wide -l sparkoperator.k8s.io/app-name=spark-pi

    NAME                                        READY   STATUS      RESTARTS   AGE       IP                  NODE                          
    spark-pi-34c0998f9f832e61-exec-1            1/1     Running     0          28s       192.XXX.XX.34       cn-beijing.192.XXX.XX.250       
    spark-pi-34c0998f9f832e61-exec-2            1/1     Running     0          28s       192.XXX.XX.87       virtual-kubelet-cn-beijing-i   
    spark-pi-34c0998f9f832e61-exec-3            1/1     Running     0          28s       192.XXX.XX.88       virtual-kubelet-cn-beijing-i   
    spark-pi-34c0998f9f832e61-exec-4            1/1     Running     0          28s       192.XXX.XX.86       virtual-kubelet-cn-beijing-i   
    spark-pi-34c0998f9f832e61-exec-5            0/1     Pending     0          28s       <none>              <none>                         
    spark-pi-driver                             1/1     Running     0          34s       192.XXX.XX.37       cn-beijing.192.XXX.XXX.250       

ImageCache機能を使用したイメージプルの高速化

エラスティックコンテナインスタンスにポッドをデプロイするときに、イメージキャッシュを使用してイメージプルを高速化できます。 詳細については、「ImageCacheを使用したエラスティックコンテナインスタンスの作成の高速化」をご参照ください。

このセクションでは、イメージキャッシュの有無にかかわらずSparkApplicationがデプロイされたときのイメージの取得速度を比較します。 このセクションでは、イメージキャッシュの自動作成と照合を有効にする方法についても説明します。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
  arguments: 
  - "5000"
  sparkVersion: 3.5.2
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
  executor:
    instances: 2
    cores: 2
    memory: 4g
  • イメージキャッシュを使用せずにSparkジョブを送信

    イメージキャッシュを使用せずにSparkジョブを送信し、ドライバポッドのイベントを確認します。

    kubectl describe pod spark-pi-driver
    Events:
      ...
      Warning  ImageCacheMissed       24m   EciService         [eci.imagecache]Missed image cache.
      Normal   ImageCacheAutoCreated  24m   EciService         [eci.imagecache]Image cache imc-2zeXXXXXXXXXXXXXXXXX is auto created
      Normal   Pulling                24m   kubelet            Pulling image "registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2"
      Normal   Pulled                 23m   kubelet            Successfully pulled image "registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2" in 1m41.289s (1m41.289s including waiting)
      ...

    イベントは、イメージキャッシュミスが発生し、イメージキャッシュが作成されたことを示します。 画像を引き出すのに約100秒かかりました。

  • イメージキャッシュを使用するSparkジョブの送信

    イメージキャッシュを指定するには、ドライバー設定とエグゼキュータ設定に次の注釈を追加します。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi-eci-only
      namespace: default
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments: 
      - "5000"
      sparkVersion: 3.5.2
      driver:
        annotations:
          # Specify the ID of the image cache that you want to use.
          k8s.aliyun.com/eci-image-snapshot-id: imc-2zeXXXXXXXXXXXXXXXXX
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark
        affinity:
          nodeAffinity:
            # Add an affinity rule to allow the job to be scheduled to elastic container instances.
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # Add a toleration to tolerate a specific taint on elastic container instances.
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
      executor:
        annotations:
          # Specify the ID of the image cache that you want to use.
          k8s.aliyun.com/eci-image-snapshot-id: imc-2zeXXXXXXXXXXXXXXXXX
        instances: 2
        cores: 2
        memory: 4g
        affinity:
          nodeAffinity:
            # Add an affinity rule to allow the job to be scheduled to elastic container instances.
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # Add a toleration to tolerate a specific taint on elastic container instances.
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule

    Sparkジョブを送信し、ドライバポッドのイベントを確認します。

     kubectl describe pod spark-pi-driver
    Events:
      ...
      Normal  SuccessfulHitImageCache  23s   EciService         [eci.imagecache]Successfully hit image cache imc-2zeXXXXXXXXXXXXXXXXX, eci will be scheduled with this image cache.
      Normal  Pulled                   4s    kubelet            Container image "registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2" already present on machine
      ...

    イベントは、イメージキャッシュヒットが発生し、イメージがプルされなかったことを示します。

  • イメージキャッシュの自動作成と一致の有効化

    イメージキャッシュの自動作成と照合を有効にするには、k8s.aliyun.com/eci-image-cache: 「真」への注釈. spec.[ドライバー | executor].annotationsパラメーターを使用します。 この場合、イメージキャッシュIDを指定する必要はありません。

    apiVersion: sparkoperator.k8s.io/v1beta2
    kind: SparkApplication
    metadata:
      name: spark-pi-eci-only
      namespace: default
    spec:
      type: Scala
      mode: cluster
      image: registry-cn-hangzhou.ack.aliyuncs.com/ack-demo/spark:3.5.2
      mainClass: org.apache.spark.examples.SparkPi
      mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.2.jar
      arguments: 
      - "5000"
      sparkVersion: 3.5.2
      driver:
        annotations:
          # Enable automatic image cache creation and match.
          k8s.aliyun.com/eci-image-cache: "true"
        cores: 1
        coreLimit: 1200m
        memory: 512m
        serviceAccount: spark-operator-spark
        affinity:
          nodeAffinity:
            # Add an affinity rule to allow the job to be scheduled to elastic container instances.
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # Add a toleration to tolerate a specific taint on elastic container instances.
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule
      executor:
        annotations:
          # Enable automatic image cache creation and match.
          k8s.aliyun.com/eci-image-cache: "true"
        instances: 2
        cores: 2
        memory: 4g
        affinity:
          nodeAffinity:
            # Add an affinity rule to allow the job to be scheduled to elastic container instances.
            requiredDuringSchedulingIgnoredDuringExecution:
              nodeSelectorTerms:
              - matchExpressions:
                - key: type
                  operator: In
                  values:
                  - virtual-kubelet
        tolerations:
        # Add a toleration to tolerate a specific taint on elastic container instances.
        - key: virtual-kubelet.io/provider
          operator: Equal
          value: alibabacloud
          effect: NoSchedule