すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:Celebornを使用してSparkジョブのRSSを有効にする

最終更新日:Dec 25, 2024

Apache Celebornは、ビッグデータ計算エンジンのシャッフルデータやスピルデータなどの中間データを処理するために使用されます。 Celebornは、ビッグデータ計算エンジンのパフォーマンス、安定性、柔軟性を効率的に向上させることができます。 RSS (Remote Shuffle Service) は、多数のデータセットをシャッフルする効率的な方法を提供します。 このトピックでは、Container Service for Kubernetes (ACK) クラスターにCelebornをデプロイする方法と、Celebornを使用してSparkジョブのRSSを有効にする方法について説明します。

メリット

Celebornを使用してRSSを有効にすると、MapReduce、Spark、Flinkなどのフレームワークに基づくビッグデータ処理に次の利点があります。

  • プッシュベースのシャッフル書き込み: マッパーはローカルディスクにデータを保存する必要はありません。 この機能は、クラウドで計算ストレージ分離アーキテクチャを使用するサービスに適しています。

  • マージベースのシャッフル書き込み: データはリデューサーではなくワーカーでマージされます。 これにより、小さなファイルや小さなデータ伝送量でのランダムな読み書きによって発生するネットワークのオーバーヘッドがなくなり、データ処理効率が向上します。

  • 高可用性: マスターズは、Raftプロトコルに基づいてシステムの高可用性と安定性を実装します。

  • 高いフォールトトレランス: Celebornは2つのレプリカをサポートし、フェッチ失敗の可能性を大幅に減らします。

前提条件

クラスター環境

次の設定に基づいてクラスター環境を設定します。

  • マスタープロセスをceleborn-masterという名前のノードプールにデプロイします。 ノードプールは次の設定を使用します。

    • ノードプール名: celeborn-master.

    • ノード数量: 3。

    • ECS (Elastic Compute Service) インスタンスタイプ: g8i.2xlarge

    • ラベル: celeborn.apache.org/role=master

    • テインツ: celeborn.apache.org/role=master:NoSchedule

    • 各マスターのデータストレージ: /mnt/celeborn_ratis (1024GB) 。

  • celeborn-workerという名前のノードプールにワーカープロセスをデプロイします。 ノードプールは次の設定を使用します。

    • ノードプール名: celeborn-worker.

    • ノード数量: 5。

    • ECSインスタンスタイプ: g8i.4xlarge

    • ラベル: celeborn.apache.org/role=worker.

    • テインツ: celeborn.apache.org/role=worker:NoSchedule.

    • 各ワーカーのデータストレージ:

      • /mnt/disk1 (1024GB)

      • /mnt/disk2 (1024GB)

      • /mnt/disk3 (1024GB)

      • /mnt/disk4 (1024GB)

手順の概要

このセクションでは、ACKクラスターにCelebornをデプロイするための主な手順について説明します。

  1. Celebornコンテナイメージを作成する

    ビジネス要件に基づいてCelebornリリースバージョンのパッケージをダウンロードします。 次に、パッケージに基づいてコンテナイメージをビルドし、そのイメージをcontainer Registryのイメージリポジトリにプッシュします。 このイメージは、ack-celebornコンポーネントをデプロイするために使用されます。

  2. ack-celebornコンポーネントのデプロイ

    ackコンソールのMarketplaceページからACK-celebornという名前のHelmチャートをインストールし、構築したイメージを使用してCelebornを展開します。 Celebornをデプロイすると、Celebornクラスターがデプロイされます。

  3. Sparkコンテナーイメージのビルド

    Object Storage Service (OSS) にアクセスするために必要なCelebornとJARパッケージを含むSparkコンテナイメージを構築し、そのイメージをcontainer Registryのイメージリポジトリにプッシュします。

  4. テストデータの準備とOSSへのアップロード

    PageRankジョブのテストデータセットを生成し、データセットをOSSにアップロードします。

  5. サンプルSparkジョブの実行

    サンプルPageRankジョブを実行し、Celebornを使用してRSSを有効にします。

  6. (オプション) 環境をクリアする

    このトピックのすべての手順を実行した後、Sparkジョブを削除し、関連するリソースが不要になった場合はリリースします。

ステップ1: Celebornコンテナイメージを作成する

ビジネス要件に基づいて、0.5.2などのCelebornリリースバージョンのパッケージをCeleborn公式Webサイトからダウンロードします。 イメージをビルドするときは、<image-REGISTRY><IMAGE-REPOSITORY> を、使用するイメージリポジトリの名前とイメージ名に置き換えます。 さらに、PLATFORMS変数を変更して、イメージのアーキテクチャを指定することもできます。 詳細については、「KubernetesへのCelebornのデプロイ」をご参照ください。 docker buildxコマンドは、Docker 19.03以降でサポートされています。 Dockerバージョンの更新方法の詳細については、「Dockerのインストール」をご参照ください。

CELEBORN_VERSION=0.5.2               # The Celeborn release version. 

IMAGE_REGISTRY=<IMAGE-REGISTRY>      # The image repository name, such as docker.io. 

IMAGE_REPOSITORY=<IMAGE-REPOSITORY>  # The image name, such as apache/celeborn. 

IMAGE_TAG=${CELEBORN_VERSION}        # The image tag. In this example, a label that indicates the Celeborn release version is added. 

# Download the package. 
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# Decompress the package. 
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# Switch the working directory. 
cd apache-celeborn-${CELEBORN_VERSION}-bin

# Use Docker Buildkit to build the image and push the image to your image repository. 
docker buildx build \
    --output=type=registry \
    --push \
    --platform=${PLATFORMS} \
    --tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
    -f docker/Dockerfile \
    .

ステップ2: ack-celebornコンポーネントのデプロイ

  1. ACKコンソールにログインします。 左側のナビゲーションウィンドウで、[Marketplace] > [Marketplace] を選択します。

  2. [マーケットプレイス] ページで、[アプリカタログ] タブをクリックします。 ack-celebornを見つけてクリックします。 ack-celebornページで、[デプロイ] をクリックします。

  3. [デプロイ] パネルで、クラスターと名前空間を選択し、[次へ] をクリックします。

  1. [パラメーター] ステップでパラメーターを設定し、[OK] をクリックします。

    image:                         # Replace the value with the address of the Celeborn image you built in Step 1. 
      registry: docker.io          # The image repository name. 
      repository: apache/celeborn  # The image name. 
      tag: 0.5.2                   # The image tag. 
    
    celeborn:
      celeborn.client.push.stageEnd.timeout: 120s
      celeborn.master.ha.enabled: true
      celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis
      celeborn.master.heartbeat.application.timeout: 300s
      celeborn.master.heartbeat.worker.timeout: 120s
      celeborn.master.http.port: 9098
      celeborn.metrics.enabled: true
      celeborn.metrics.prometheus.path: /metrics/prometheus
      celeborn.rpc.dispatcher.numThreads: 4
      celeborn.rpc.io.clientThreads: 64
      celeborn.rpc.io.numConnectionsPerPeer: 2
      celeborn.rpc.io.serverThreads: 64
      celeborn.shuffle.chunk.size: 8m
      celeborn.worker.fetch.io.threads: 32
      celeborn.worker.flusher.buffer.size: 256K
      celeborn.worker.http.port: 9096
      celeborn.worker.monitor.disk.enabled: false
      celeborn.worker.push.io.threads: 32
      celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi
    
    master:
      replicas: 3
      env:
      - name: CELEBORN_MASTER_MEMORY
        value: 28g
      - name: CELEBORN_MASTER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: celeborn-ratis
        mountPath: /mnt/celeborn_ratis
      resources:
        requests:
          cpu: 7                
          memory: 28Gi        
        limits:
          cpu: 7
          memory: 28Gi
      volumes:
      - name: celeborn-ratis
        hostPath:
          path: /mnt/celeborn_ratis
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: master
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: master
        effect: NoSchedule
    
    worker:
      replicas: 5
      env:
      - name: CELEBORN_WORKER_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_OFFHEAP_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: disk1
        mountPath: /mnt/disk1
      - name: disk2
        mountPath: /mnt/disk2
      - name: disk3
        mountPath: /mnt/disk3
      - name: disk4
        mountPath: /mnt/disk4
      resources:
        requests:
          cpu: 14
          memory: 56Gi
        limits:
          cpu: 14
          memory: 56Gi
      volumes:
      - name: disk1
        hostPath:
          path: /mnt/disk1
          type: DirectoryOrCreate
      - name: disk2
        hostPath:
          path: /mnt/disk2
          type: DirectoryOrCreate
      - name: disk3
        hostPath:
          path: /mnt/disk3
          type: DirectoryOrCreate
      - name: disk4
        hostPath:
          path: /mnt/disk4
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: worker
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: worker
        effect: NoSchedule

    下表に一部のパラメーターを示します。 パラメーター設定は、ack-celebornページの [パラメーター] セクションにあります。

    Parameters

    パラメーター

    説明

    image.registry

    イメージリポジトリのアドレス。

    docker.io」

    image.repository

    The image name.

    「apache/celeborn」

    image.tag

    画像タグ。

    "0.5.1"

    image.pullPolicy

    イメージ取得ポリシー。

    "IfNotPresent"

    セレボーン

    セレボーンの構成。

    {
      "celeborn.client.push.stageEnd.timeout": "120s",
      "celeborn.master.ha.enabled": true,
      "celeborn.master.ha.ratis.raft.server.storage.dir": "/mnt/celeborn_ratis",
      "celeborn.master.heartbeat.application.timeout": "300s",
      "celeborn.master.heartbeat.worker.timeout": "120s",
      "celeborn.master.http.port": 9098,
      "celeborn.metrics.enabled": true,
      "celeborn.metrics.prometheus.path": "/metrics/prometheus",
      "celeborn.rpc.dispatcher.numThreads": 4,
      "celeborn.rpc.io.clientThreads": 64,
      "celeborn.rpc.io.numConnectionsPerPeer": 2,
      "celeborn.rpc.io.serverThreads": 64,
      "celeborn.shuffle.chunk.size": "8m",
      "celeborn.worker.fetch.io.threads": 32,
      "celeborn.worker.flusher.buffer.size": "256K",
      "celeborn.worker.http.port": 9096,
      "celeborn.worker.monitor.disk.enabled": false,
      "celeborn.worker.push.io.threads": 32,
      "celeborn.worker.storage.dirs": "/mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi"
    }

    master.replicas

    マスターポッドの数。

    3

    master.volumeMounts

    宣言されたボリュームをコンテナー内のマスターポッドにマウントする場所を指定します。

    [
      {
        "mountPath": "/mnt/celeborn_ratis",
        "name": "celeborn-ratis"
      }
    ]

    master.volumes

    マスターポッドにマウントされたボリュームの宣言。

    マスターポッドにマウントできるのは、hostPathボリュームとemptyDirボリュームのみです。

    [
      {
        "hostPath": {
          "path": "/mnt/celeborn_ratis",
          "type": "DirectoryOrCreate"
        },
        "name": "celeborn-ratis"
      }
    ]

    master.nodeSelector

    マスターポッドのノードセレクタ。

    {}

    master.affinity

    マスターポッドのアフィニティルール。

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "master"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    master.tolerations

    マスターポッドの許容範囲。

    []

    worker.replicas

    ワーカーポッドの数。

    5

    worker.volumeMounts

    宣言されたボリュームをコンテナー内のワーカーポッドにマウントする場所を指定します。

    [
      {
        "mountPath": "/mnt/disk1",
        "name": "disk1"
      },
      {
        "mountPath": "/mnt/disk2",
        "name": "disk2"
      },
      {
        "mountPath": "/mnt/disk3",
        "name": "disk3"
      },
      {
        "mountPath": "/mnt/disk4",
        "name": "disk4"
      }
    ]

    worker.volumes

    ワーカーポッドにマウントされたボリュームの宣言。

    ワーカーポッドにマウントできるのは、hostPathボリュームとemptyDirボリュームのみです。

    [
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk1",
        "mountPath": "/mnt/disk1",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk2",
        "mountPath": "/mnt/disk2",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk3",
        "mountPath": "/mnt/disk3",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk4",
        "mountPath": "/mnt/disk4",
        "type": "hostPath"
      }
    ]

    worker.nodeSelector

    ワーカーポッドのノードセレクタ。

    {}

    worker.affinity

    ワーカーポッドのアフィニティルール。

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "worker"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    worker.tolerations

    ワーカーポッドの許容範囲。

    []

  2. 以下を実行してCelebornをデプロイします。 展開プロセス中に問題が発生した場合は、ポッドのトラブルシューティングを参照して問題を解決します。

    kubectl get -n celeborn statefulset 

    期待される出力:

    NAME              READY   AGE
    celeborn-master   3/3     68s
    celeborn-worker   5/5     68s

ステップ3: Sparkコンテナイメージを構築する

この例では、Spark 3.5.3が使用されます。 次のコンテンツに基づいてDockerfileを作成します。 次に、Dockerfileを使用してイメージを作成し、そのイメージをイメージリポジトリにプッシュします。

ARG SPARK_IMAGE=<SPARK_IMAGE>  # Replace <SPARK_IMAGE> with your Spark base image. 

FROM ${SPARK_IMAGE}

# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars

手順4: テストデータの準備とOSSへのアップロード

テストデータを準備してOSSにアップロードする方法の詳細については、「手順1: テストデータを準備してOSSバケットにアップロードする」をご参照ください。

ステップ5: OSSへのアクセスに使用される資格情報を保存するシークレットを作成する

OSSへのアクセスに使用される資格情報を保存するためのシークレットの作成方法の詳細については、「手順3: OSSアクセス資格情報を保存するシークレットの作成」をご参照ください。

ステップ6: サンプルSparkジョブを送信する

spark-pagerank.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。 ファイルはSparkApplicationを作成するために使用されます。 <SPARK_IMAGE> を、手順3: Sparkコンテナーイメージの構築で指定したイメージリポジトリのアドレスに置き換えます。 <OSS_BUCKET> および <OSS_ENDPOINT> を、使用するOSSバケットの名前およびバケットのエンドポイントに置き換えます。 SparkジョブでCelebornを設定する方法の詳細については、「Celebornドキュメント」をご参照ください。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: <SPARK_IMAGE>                                     # The Spark image. Replace <SPARK_IMAGE> with the name of the Spark image.
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # The test dataset. Replace <OSS_BUCKET> with the name of the OSS bucket you use. 
  - "10"                                                   # The number of iterations. 
  sparkVersion: 3.5.3
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                        # The endpoint of the OSS bucket. The internal endpoint for the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com.  
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  sparkConf:
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    spark.celeborn.client.spark.shuffle.writer: hash
    spark.celeborn.client.push.replicate.enabled: "false"
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    spark.executor.userClassPathFirst: "false"
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret
  restartPolicy:
    type: Never

(オプション) ステップ7: 環境をクリアする

このトピックのすべての手順を実行した後、次のコマンドを実行してSparkジョブを削除し、関連するリソースが不要になった場合は関連するリソースを解放します。

次のコマンドを実行して、Sparkジョブを削除します。

kubectl delete sparkapplication spark-pagerank

次のコマンドを実行して、シークレットを削除します。

kubectl delete secret spark-oss-secret

関連ドキュメント