すべてのプロダクト
Search
ドキュメントセンター

Container Service for Kubernetes:Celeborn を Spark ジョブのリモートシャッフルサービス (RSS) として使用する

最終更新日:Mar 25, 2026

Apache Celeborn は、ビッグデータ処理エンジン向けの中間データ(例: shuffle dataspill data)を管理するためのサービスです。リモートシャッフルサービス (RSS) として機能し、大規模データセットにおけるシャッフル処理を効率的に実行することで、これらのエンジンのパフォーマンス、安定性、および柔軟性を向上させます。本トピックでは、Container Service for Kubernetes (ACK) クラスター内に Celeborn コンポーネントをデプロイし、Spark ジョブの RSS として Celeborn を使用する方法について説明します。

メリット

MapReduce、Spark、Flink などのビッグデータ処理フレームワークにおいて、Celeborn を RSS として使用すると、以下のメリットがあります:

  • プッシュベースのシャッフル書き込み:マッパー(Mapper)ノードがローカルディスク上にデータを保存する必要がなく、ストレージとコンピューティングの分離 を採用したクラウドネイティブアーキテクチャに最適です。

  • マージベースのシャッフル読み込み:データはリデューサー(Reducer)ノードではなく、worker ノード上でマージされます。これにより、ランダムな小規模ファイル I/O や小規模データ転送に伴うネットワークオーバーヘッドが回避され、データ処理効率が向上します。

  • 高可用性:Celeborn の master ノードは Raft 合意プロトコルを採用しており、高可用性 を実現し、システムの安定性を確保します。

  • フォールトトレランス:Celeborn は デュアルレプリカ をサポートしており、フェッチ失敗の発生確率を大幅に低減します。

前提条件

クラスター環境

本例では、以下の ACK クラスター構成を使用します:

  • master プロセスを celeborn-master ノードプール にデプロイし、以下の構成を適用します:

    • ノードプール 名:celeborn-master

    • ノード数:3

    • ECS インスタンスタイプ:g8i.2xlarge

    • ラベル:celeborn.apache.org/role=master

    • Taint:celeborn.apache.org/role=master:NoSchedule

    • ノードあたりのデータストレージ:/mnt/celeborn_ratis(1024 GB)

  • worker プロセスを celeborn-worker ノードプール にデプロイし、以下の構成を適用します:

    • ノードプール 名:celeborn-worker

    • ノード数:5

    • ECS インスタンスタイプ:g8i.4xlarge

    • ラベル:celeborn.apache.org/role=worker

    • Taint:celeborn.apache.org/role=worker:NoSchedule

    • ノードあたりのデータストレージ:

      • /mnt/disk1(1024 GB)

      • /mnt/disk2(1024 GB)

      • /mnt/disk3(1024 GB)

      • /mnt/disk4(1024 GB)

操作手順の概要

本ガイドでは、ACK クラスター内に Celeborn をデプロイする手順について説明します。

  1. Celeborn コンテナイメージ の構築

    Celeborn リリース をダウンロードし、コンテナイメージ を構築してから、ご利用の イメージリポジトリ にプッシュし、ack-celeborn コンポーネントをデプロイします。

  2. ack-celeborn コンポーネントのデプロイ

    ACK Marketplace から提供される ack-celeborn Helm chart を使用し、前述で構築した コンテナイメージ を基に Celeborn クラスターをデプロイします。

  3. Spark コンテナイメージ の構築

    Celeborn および OSS へのアクセスに必要な依存関係を含む Spark コンテナイメージ を構築し、そのイメージを イメージリポジトリ にプッシュします。

  4. テストデータの OSS への準備およびアップロード

    PageRank ジョブのテストデータセットを生成し、OSS にアップロードします。

  5. サンプル Spark ジョブの実行

    サンプル PageRank ジョブを実行し、Celeborn を RSS として使用するよう設定します。

  6. (任意)リソースのクリーンアップ

    チュートリアルの完了後、不要となった Spark ジョブおよびその他のリソースをクリーンアップし、不要な課金を回避します。

ステップ 1:Celeborn コンテナイメージの構築

Celeborn 公式ウェブサイト から必要な リリース(例:バージョン 0.5.2)をダウンロードします。構成時に、<IMAGE-REGISTRY> および <IMAGE-REPOSITORY> を、ご利用のイメージレジストリおよびイメージ名に置き換えます。また、PLATFORMS 変数を変更して、必要なイメージアーキテクチャを設定することもできます。詳細については、「Kubernetes への Celeborn のデプロイ」をご参照ください。docker buildx コマンドは Docker 19.03 以降を必要とします。Docker のアップグレード方法については、「Docker および Docker Compose のインストールと使用」をご参照ください。

CELEBORN_VERSION=0.5.2               # Celeborn のバージョン。

IMAGE_REGISTRY=<IMAGE-REGISTRY>      # イメージレジストリ(例:docker.io)。

IMAGE_REPOSITORY=<IMAGE-REPOSITORY>  # イメージ名(例:apache/celeborn)。

IMAGE_TAG=${CELEBORN_VERSION}        # イメージタグ。本例では Celeborn のバージョンをタグとして使用します。

PLATFORMS=linux/amd64                # イメージプラットフォームアーキテクチャ。複数のプラットフォームをサポートする場合は、カンマで区切ります(例:linux/amd64,linux/arm64)。

# リリースパッケージをダウンロードします。
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# パッケージを展開します。
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz

# 作業ディレクトリに移動します。
cd apache-celeborn-${CELEBORN_VERSION}-bin

# Docker Buildx を使用してイメージを構築し、イメージリポジトリにプッシュします。
docker buildx build \
    --output=type=registry \
    --push \
    --platform=${PLATFORMS} \
    --tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
    -f docker/Dockerfile \
    .

ステップ 2:ack-celeborn コンポーネントのデプロイ

  1. ACK コンソール にログインします。左側のナビゲーションウィンドウで、ストア > Marketplace をクリックします。

  2. Marketplace ページで、アプリカタログ タブをクリックし、ack-celeborn を検索・選択して、ack-celeborn ページで デプロイ をクリックします。

  3. 作成する パネルで、クラスターおよび名前空間を選択し、次へ をクリックします。

  4. パラメーター ページで、パラメーターを設定し、OK をクリックします。

    image:                         # ステップ 1 で構築した Celeborn イメージのアドレスに置き換えてください。
      registry: docker.io          # イメージレジストリ。
      repository: apache/celeborn  # イメージ名。
      tag: 0.5.2                   # イメージタグ。
    
    celeborn:
      celeborn.client.push.stageEnd.timeout: 120s
      celeborn.master.ha.enabled: true
      celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis
      celeborn.master.heartbeat.application.timeout: 300s
      celeborn.master.heartbeat.worker.timeout: 120s
      celeborn.master.http.port: 9098
      celeborn.metrics.enabled: true
      celeborn.metrics.prometheus.path: /metrics/prometheus
      celeborn.rpc.dispatcher.numThreads: 4
      celeborn.rpc.io.clientThreads: 64
      celeborn.rpc.io.numConnectionsPerPeer: 2
      celeborn.rpc.io.serverThreads: 64
      celeborn.shuffle.chunk.size: 8m
      celeborn.worker.fetch.io.threads: 32
      celeborn.worker.flusher.buffer.size: 256K
      celeborn.worker.http.port: 9096
      celeborn.worker.monitor.disk.enabled: false
      celeborn.worker.push.io.threads: 32
      celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi
    
    master:
      replicas: 3
      env:
      - name: CELEBORN_MASTER_MEMORY
        value: 28g
      - name: CELEBORN_MASTER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: celeborn-ratis
        mountPath: /mnt/celeborn_ratis
      resources:
        requests:
          cpu: 7                
          memory: 28Gi        
        limits:
          cpu: 7
          memory: 28Gi
      volumes:
      - name: celeborn-ratis
        hostPath:
          path: /mnt/celeborn_ratis
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: master
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: master
        effect: NoSchedule
    
    worker:
      replicas: 5
      env:
      - name: CELEBORN_WORKER_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_OFFHEAP_MEMORY
        value: 28g
      - name: CELEBORN_WORKER_JAVA_OPTS
        value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced
      - name: CELEBORN_NO_DAEMONIZE
        value: "1"
      - name: TZ
        value: Asia/Shanghai
      volumeMounts:
      - name: disk1
        mountPath: /mnt/disk1
      - name: disk2
        mountPath: /mnt/disk2
      - name: disk3
        mountPath: /mnt/disk3
      - name: disk4
        mountPath: /mnt/disk4
      resources:
        requests:
          cpu: 14
          memory: 56Gi
        limits:
          cpu: 14
          memory: 56Gi
      volumes:
      - name: disk1
        hostPath:
          path: /mnt/disk1
          type: DirectoryOrCreate
      - name: disk2
        hostPath:
          path: /mnt/disk2
          type: DirectoryOrCreate
      - name: disk3
        hostPath:
          path: /mnt/disk3
          type: DirectoryOrCreate
      - name: disk4
        hostPath:
          path: /mnt/disk4
          type: DirectoryOrCreate
      nodeSelector:
        celeborn.apache.org/role: worker
      tolerations:
      - key: celeborn.apache.org/role
        operator: Equal
        value: worker
        effect: NoSchedule

    以下の表では、主なパラメーターについて説明します。すべてのパラメーターの一覧については、ack-celeborn ページの ConfigMap セクションをご参照ください。

    パラメーター設定

    パラメーター

    説明

    image.registry

    イメージレジストリ のアドレスです。

    "docker.io"

    image.repository

    イメージ名です。

    "apache/celeborn"

    image.tag

    イメージタグです。

    "0.5.2"

    image.pullPolicy

    イメージプルポリシーです。

    "IfNotPresent"

    celeborn

    Celeborn の構成プロパティです。

    {
      "celeborn.client.push.stageEnd.timeout": "120s",
      "celeborn.master.ha.enabled": true,
      "celeborn.master.ha.ratis.raft.server.storage.dir": "/mnt/celeborn_ratis",
      "celeborn.master.heartbeat.application.timeout": "300s",
      "celeborn.master.heartbeat.worker.timeout": "120s",
      "celeborn.master.http.port": 9098,
      "celeborn.metrics.enabled": true,
      "celeborn.metrics.prometheus.path": "/metrics/prometheus",
      "celeborn.rpc.dispatcher.numThreads": 4,
      "celeborn.rpc.io.clientThreads": 64,
      "celeborn.rpc.io.numConnectionsPerPeer": 2,
      "celeborn.rpc.io.serverThreads": 64,
      "celeborn.shuffle.chunk.size": "8m",
      "celeborn.worker.fetch.io.threads": 32,
      "celeborn.worker.flusher.buffer.size": "256K",
      "celeborn.worker.http.port": 9096,
      "celeborn.worker.monitor.disk.enabled": false,
      "celeborn.worker.push.io.threads": 32,
      "celeborn.worker.storage.dirs": "/mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi"
    }

    master.replicas

    master Pod のレプリカ数です。

    3

    master.volumeMounts

    master コンテナの volumeMounts です。

    [
      {
        "mountPath": "/mnt/celeborn_ratis",
        "name": "celeborn-ratis"
      }
    ]

    master.volumes

    マスター Pod のデータ量。

    hostPathemptyDir のボリュームタイプのみがサポートされています。

    [
      {
        "hostPath": {
          "path": "/mnt/celeborn_ratis",
          "type": "DirectoryOrCreate"
        },
        "name": "celeborn-ratis"
      }
    ]

    master.nodeSelector

    master Podnode selector です。

    {}

    master.affinity

    master Podaffinity ルールです。

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "master"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    master.tolerations

    master Podtolerations です。

    []

    worker.replicas

    worker Pod のレプリカ数です。

    5

    worker.volumeMounts

    worker コンテナの volumeMounts です。

    [
      {
        "mountPath": "/mnt/disk1",
        "name": "disk1"
      },
      {
        "mountPath": "/mnt/disk2",
        "name": "disk2"
      },
      {
        "mountPath": "/mnt/disk3",
        "name": "disk3"
      },
      {
        "mountPath": "/mnt/disk4",
        "name": "disk4"
      }
    ]

    worker.volumes

    worker Pod のデータ量。

    サポートされているボリュームタイプは hostPathemptyDir のみです。

    [
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk1",
        "mountPath": "/mnt/disk1",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk2",
        "mountPath": "/mnt/disk2",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk3",
        "mountPath": "/mnt/disk3",
        "type": "hostPath"
      },
      {
        "capacity": "100Gi",
        "diskType": "SSD",
        "hostPath": "/mnt/disk4",
        "mountPath": "/mnt/disk4",
        "type": "hostPath"
      }
    ]

    worker.nodeSelector

    worker Podnode selector です。

    {}

    worker.affinity

    worker Podaffinity ルールです。

    {
      "podAntiAffinity": {
        "requiredDuringSchedulingIgnoredDuringExecution": [
          {
            "labelSelector": {
              "matchExpressions": [
                {
                  "key": "app.kubernetes.io/name",
                  "operator": "In",
                  "values": [
                    "celeborn"
                  ]
                },
                {
                  "key": "app.kubernetes.io/role",
                  "operator": "In",
                  "values": [
                    "worker"
                  ]
                }
              ]
            },
            "topologyKey": "kubernetes.io/hostname"
          }
        ]
      }
    }

    worker.tolerations

    worker Podtolerations です。

    []

  1. 以下のコマンドを実行し、Celeborn のデプロイが完了するまで待ちます。コンポーネントのデプロイ中に Pod の問題が発生した場合は、「Pod のトラブルシューティング」をご参照ください。

    kubectl get -n celeborn statefulset 

    期待される出力:

    NAME              READY   AGE
    celeborn-master   3/3     68s
    celeborn-worker   5/5     68s

ステップ 3:Spark コンテナイメージの構築

本例では Spark 3.5.3 を使用します。以下の内容を含む Dockerfile を作成し、コンテナイメージ を構築してから、ご利用の イメージリポジトリ にプッシュします。

ARG SPARK_IMAGE=<SPARK_IMAGE>  # <SPARK_IMAGE> をご利用の Spark ベースイメージに置き換えます。

FROM ${SPARK_IMAGE}

# Hadoop Aliyun OSS サポートの依存関係を追加
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars

# Celeborn の依存関係を追加
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars

ステップ 4:テストデータを OSS にアップロード

テストデータの準備および OSS へのアップロード方法については、「ステップ 1:テストデータの準備および OSS へのアップロード」をご参照ください。

ステップ 5:OSS シークレットの作成

OSS の アクセス認証情報 を格納する シークレット の作成方法については、「ステップ 3:OSS アクセス認証情報を格納するシークレットの作成」をご参照ください。

ステップ 6:サンプル Spark ジョブの送信

以下の内容で spark-pagerank.yaml という名前の SparkApplication マニフェストファイルを作成します。ステップ 3:Spark コンテナイメージの構築 で構築したイメージの名前で <SPARK_IMAGE> を置き換え、<OSS_BUCKET> および <OSS_ENDPOINT> をご利用の OSS バケット および エンドポイント に置き換えます。Spark ジョブにおける Celeborn の構成方法については、「Celeborn ドキュメント」をご参照ください。

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
  name: spark-pagerank
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: <SPARK_IMAGE>                                     # Spark イメージ。<SPARK_IMAGE> をご利用の Spark イメージ名に置き換えます。
  mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
  mainClass: org.apache.spark.examples.SparkPageRank
  arguments:
  - oss://<OSS_BUCKET>/data/pagerank_dataset.txt           # 入力テストデータセット。<OSS_BUCKET> をご利用の OSS バケット名に置き換えます。
  - "10"                                                   # 反復回数。
  sparkVersion: 3.5.3
  hadoopConf:
    fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
    fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
    fs.oss.endpoint: <OSS_ENDPOINT>                        # OSS エンドポイント。例:中国 (北京) リージョンの OSS 内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
    fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
  sparkConf:
    spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
    spark.serializer: org.apache.spark.serializer.KryoSerializer
    spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
    spark.celeborn.client.spark.shuffle.writer: hash
    spark.celeborn.client.push.replicate.enabled: "false"
    spark.sql.adaptive.localShuffleReader.enabled: "false"
    spark.sql.adaptive.enabled: "true"
    spark.sql.adaptive.skewJoin.enabled: "true"
    spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
    spark.dynamicAllocation.shuffleTracking.enabled: "false"
    spark.executor.userClassPathFirst: "false"
  driver:
    cores: 1
    coreLimit: 1200m
    memory: 512m
    serviceAccount: spark-operator-spark
    envFrom:
    - secretRef:
        name: spark-oss-secret
  executor:
    instances: 2
    cores: 1
    coreLimit: "2"
    memory: 8g
    envFrom:
    - secretRef:
        name: spark-oss-secret
  restartPolicy:
    type: Never

(任意)ステップ 7:リソースのクリーンアップ

本チュートリアルを完了した後、以下のコマンドを実行してリソースを削除し、不要な課金を回避します。

Spark ジョブを削除するには、以下のコマンドを実行します:

kubectl delete sparkapplication spark-pagerank

シークレット リソースを削除するには、以下のコマンドを実行します:

kubectl delete secret spark-oss-secret

参考情報