Apache Celeborn は、ビッグデータ処理エンジン向けの中間データ(例: shuffle data や spill data)を管理するためのサービスです。リモートシャッフルサービス (RSS) として機能し、大規模データセットにおけるシャッフル処理を効率的に実行することで、これらのエンジンのパフォーマンス、安定性、および柔軟性を向上させます。本トピックでは、Container Service for Kubernetes (ACK) クラスター内に Celeborn コンポーネントをデプロイし、Spark ジョブの RSS として Celeborn を使用する方法について説明します。
メリット
MapReduce、Spark、Flink などのビッグデータ処理フレームワークにおいて、Celeborn を RSS として使用すると、以下のメリットがあります:
-
プッシュベースのシャッフル書き込み:マッパー(Mapper)ノードがローカルディスク上にデータを保存する必要がなく、
ストレージとコンピューティングの分離を採用したクラウドネイティブアーキテクチャに最適です。 -
マージベースのシャッフル読み込み:データはリデューサー(Reducer)ノードではなく、
workerノード上でマージされます。これにより、ランダムな小規模ファイル I/O や小規模データ転送に伴うネットワークオーバーヘッドが回避され、データ処理効率が向上します。 -
高可用性:Celeborn のmasterノードは Raft 合意プロトコルを採用しており、高可用性を実現し、システムの安定性を確保します。 -
フォールトトレランス:Celeborn はデュアルレプリカをサポートしており、フェッチ失敗の発生確率を大幅に低減します。
前提条件
ack-spark-operator コンポーネントがインストール済みであること。詳細については、「ステップ 1:ack-spark-operator コンポーネントのデプロイ」をご参照ください。
kubectl クライアントが ACK クラスターに接続済みであること。詳細については、「kubectl を使用した ACK クラスターへの接続」をご参照ください。
-
次のセクションで説明するクラスター環境に基づき、
ノードプールを作成および構成済みであること。詳細については、「ノードプールの作成と管理」をご参照ください。
クラスター環境
本例では、以下の ACK クラスター構成を使用します:
-
masterプロセスを celeborn-masterノードプールにデプロイし、以下の構成を適用します:-
ノードプール名:celeborn-master -
ノード数:3
-
ECS インスタンスタイプ:g8i.2xlarge -
ラベル:celeborn.apache.org/role=master -
Taint:celeborn.apache.org/role=master:NoSchedule -
ノードあたりのデータストレージ:/mnt/celeborn_ratis(1024 GB)
-
-
workerプロセスを celeborn-workerノードプールにデプロイし、以下の構成を適用します:-
ノードプール名:celeborn-worker -
ノード数:5
-
ECS インスタンスタイプ:g8i.4xlarge -
ラベル:celeborn.apache.org/role=worker -
Taint:celeborn.apache.org/role=worker:NoSchedule -
ノードあたりのデータストレージ:
-
/mnt/disk1(1024 GB)
-
/mnt/disk2(1024 GB)
-
/mnt/disk3(1024 GB)
-
/mnt/disk4(1024 GB)
-
-
操作手順の概要
本ガイドでは、ACK クラスター内に Celeborn をデプロイする手順について説明します。
-
Celeborn
コンテナイメージの構築Celeborn
リリースをダウンロードし、コンテナイメージを構築してから、ご利用のイメージリポジトリにプッシュし、ack-celeborn コンポーネントをデプロイします。 -
ack-celeborn コンポーネントのデプロイ
ACKMarketplaceから提供される ack-celebornHelm chartを使用し、前述で構築したコンテナイメージを基に Celeborn クラスターをデプロイします。 -
Sparkコンテナイメージの構築Celeborn および OSS へのアクセスに必要な依存関係を含む
Sparkコンテナイメージを構築し、そのイメージをイメージリポジトリにプッシュします。 -
テストデータの OSS への準備およびアップロード
PageRank ジョブのテストデータセットを生成し、
OSSにアップロードします。 -
サンプル
Sparkジョブの実行サンプル PageRank ジョブを実行し、Celeborn を RSS として使用するよう設定します。
-
(任意)リソースのクリーンアップ
チュートリアルの完了後、不要となった
Sparkジョブおよびその他のリソースをクリーンアップし、不要な課金を回避します。
ステップ 1:Celeborn コンテナイメージの構築
Celeborn 公式ウェブサイト から必要な リリース(例:バージョン 0.5.2)をダウンロードします。構成時に、<IMAGE-REGISTRY> および <IMAGE-REPOSITORY> を、ご利用のイメージレジストリおよびイメージ名に置き換えます。また、PLATFORMS 変数を変更して、必要なイメージアーキテクチャを設定することもできます。詳細については、「Kubernetes への Celeborn のデプロイ」をご参照ください。docker buildx コマンドは Docker 19.03 以降を必要とします。Docker のアップグレード方法については、「Docker および Docker Compose のインストールと使用」をご参照ください。
CELEBORN_VERSION=0.5.2 # Celeborn のバージョン。
IMAGE_REGISTRY=<IMAGE-REGISTRY> # イメージレジストリ(例:docker.io)。
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # イメージ名(例:apache/celeborn)。
IMAGE_TAG=${CELEBORN_VERSION} # イメージタグ。本例では Celeborn のバージョンをタグとして使用します。
PLATFORMS=linux/amd64 # イメージプラットフォームアーキテクチャ。複数のプラットフォームをサポートする場合は、カンマで区切ります(例:linux/amd64,linux/arm64)。
# リリースパッケージをダウンロードします。
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# パッケージを展開します。
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# 作業ディレクトリに移動します。
cd apache-celeborn-${CELEBORN_VERSION}-bin
# Docker Buildx を使用してイメージを構築し、イメージリポジトリにプッシュします。
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.
ステップ 2:ack-celeborn コンポーネントのデプロイ
ACK コンソール にログインします。左側のナビゲーションウィンドウで、 をクリックします。
-
Marketplace ページで、アプリカタログ タブをクリックし、ack-celeborn を検索・選択して、ack-celeborn ページで デプロイ をクリックします。
-
作成する パネルで、クラスターおよび名前空間を選択し、次へ をクリックします。
-
パラメーター ページで、パラメーターを設定し、OK をクリックします。
image: # ステップ 1 で構築した Celeborn イメージのアドレスに置き換えてください。 registry: docker.io # イメージレジストリ。 repository: apache/celeborn # イメージ名。 tag: 0.5.2 # イメージタグ。 celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoSchedule以下の表では、主なパラメーターについて説明します。すべてのパラメーターの一覧については、ack-celeborn ページの ConfigMap セクションをご参照ください。
-
以下のコマンドを実行し、Celeborn のデプロイが完了するまで待ちます。コンポーネントのデプロイ中に Pod の問題が発生した場合は、「Pod のトラブルシューティング」をご参照ください。
kubectl get -n celeborn statefulset期待される出力:
NAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
ステップ 3:Spark コンテナイメージの構築
本例では Spark 3.5.3 を使用します。以下の内容を含む Dockerfile を作成し、コンテナイメージ を構築してから、ご利用の イメージリポジトリ にプッシュします。
ARG SPARK_IMAGE=<SPARK_IMAGE> # <SPARK_IMAGE> をご利用の Spark ベースイメージに置き換えます。
FROM ${SPARK_IMAGE}
# Hadoop Aliyun OSS サポートの依存関係を追加
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Celeborn の依存関係を追加
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars
ステップ 4:テストデータを OSS にアップロード
テストデータの準備および OSS へのアップロード方法については、「ステップ 1:テストデータの準備および OSS へのアップロード」をご参照ください。
ステップ 5:OSS シークレットの作成
OSS の アクセス認証情報 を格納する シークレット の作成方法については、「ステップ 3:OSS アクセス認証情報を格納するシークレットの作成」をご参照ください。
ステップ 6:サンプル Spark ジョブの送信
以下の内容で spark-pagerank.yaml という名前の SparkApplication マニフェストファイルを作成します。ステップ 3:Spark コンテナイメージの構築 で構築したイメージの名前で <SPARK_IMAGE> を置き換え、<OSS_BUCKET> および <OSS_ENDPOINT> をご利用の OSS バケット および エンドポイント に置き換えます。Spark ジョブにおける Celeborn の構成方法については、「Celeborn ドキュメント」をご参照ください。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # Spark イメージ。<SPARK_IMAGE> をご利用の Spark イメージ名に置き換えます。
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # 入力テストデータセット。<OSS_BUCKET> をご利用の OSS バケット名に置き換えます。
- "10" # 反復回数。
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # OSS エンドポイント。例:中国 (北京) リージョンの OSS 内部エンドポイントは oss-cn-beijing-internal.aliyuncs.com です。
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never
(任意)ステップ 7:リソースのクリーンアップ
本チュートリアルを完了した後、以下のコマンドを実行してリソースを削除し、不要な課金を回避します。
Spark ジョブを削除するには、以下のコマンドを実行します:
kubectl delete sparkapplication spark-pagerank
シークレット リソースを削除するには、以下のコマンドを実行します:
kubectl delete secret spark-oss-secret
参考情報
-
Spark Operator を使用した
Sparkジョブの送信方法については、「Spark Operator を使用した Spark ジョブの実行」をご参照ください。 -
Spark History Server を使用した
Sparkジョブ情報の表示方法については、「Spark History Server を使用した Spark ジョブ情報の表示」をご参照ください。 -
Celeborn の使用方法については、「Apache Celeborn ドキュメント」をご参照ください。