Apache Celeborn は、ビッグデータコンピュートエンジン向けにシャッフルデータやスピルデータなどの中間データを管理するリモートシャッフルサービス (RSS) です。このトピックでは、Alibaba Cloud Container Service for Kubernetes (ACK) クラスターに Celeborn をデプロイし、Spark ジョブの RSS として使用する方法について説明します。
メリット
Celeborn は、MapReduce、Spark、Flink に基づくビッグデータワークロードに以下のメリットをもたらします。
プッシュベースのシャッフル:マッパーはローカルディスクにデータを保存する必要がありません。これは、ストレージとコンピュートを分離するクラウドネイティブアーキテクチャに適しています。
マージベースのシャッフル:データはレデューサーではなくワーカーでマージされるため、ランダムな小規模ファイルの I/O や小規模なデータ転送によるネットワークオーバーヘッドが解消されます。
高可用性:マスターは Raft コンセンサスプロトコルを使用して高可用性を実現します。
フォールトトレランス:デュアルレプリカにより、フェッチ失敗の可能性が大幅に減少します。
前提条件
開始する前に、以下のタスクが完了していることを確認してください。
ack-spark-operator コンポーネントをインストールしました。詳細については、「手順 1: ack-spark-operator コンポーネントのインストール」をご参照ください。
kubectl クライアントが ACK クラスターに接続されています。詳細については、「クラスターの kubeconfig ファイルを取得し、kubectl を使用してクラスターに接続する」をご参照ください。
Object Storage Service (OSS) バケットを作成しました。詳細については、「バケットの作成」をご参照ください。
ossutil がインストールおよび設定されていること。 詳細については、「ossutil のインストール」、「ossutil の設定」、および「ossutil コマンドリファレンス」をご参照ください。
「クラスター環境」セクションの仕様に基づいてノードプールが作成されていること。詳細については、「ノードプールの作成」をご参照ください。
クラスター環境
以下の設定で 2 つのノードプールを作成します。
celeborn-master ノードプール
パラメーター | 値 |
ノードプール名 | celeborn-master |
ノード数 | 3 |
Elastic Compute Service (ECS) インスタンスタイプ | g8i.2xlarge |
ラベル |
|
テインツ |
|
マスターごとのデータストレージ |
|
celeborn-worker ノードプール
パラメーター | 値 |
ノードプール名 | celeborn-worker |
ノード数 | 5 |
ECS インスタンスタイプ | g8i.4xlarge |
ラベル |
|
テイント |
|
ワーカーごとのデータストレージ |
|
手順の概要
ステップ | 概要 |
Celeborn のリリースをダウンロードし、コンテナイメージをビルドします。イメージをご利用の Container Registry リポジトリにプッシュします。 | |
ACK コンソールのマーケットプレイスから ack-celeborn Helm チャートをインストールして、Celeborn クラスターをデプロイします。 | |
Celeborn クライアントと OSS の依存関係 JAR を含む Spark イメージをビルドします。イメージをご利用の Container Registry リポジトリにプッシュします。 | |
PageRank テストデータセットを生成し、OSS にアップロードします。 | |
OSS へのアクセス認証情報を保存する Kubernetes Secret を作成します。 | |
Celeborn をシャッフルサービスとして使用する PageRank ジョブを実行します。 | |
Spark ジョブを削除し、リソースを解放します。 |
ステップ 1:Celeborn コンテナイメージのビルド
Celeborn の公式サイトから Celeborn のリリースパッケージをダウンロードし、コンテナイメージをビルドして、ご利用の Container Registry リポジトリにプッシュします。Kubernetes 上での Celeborn のデプロイに関する詳細については、「Deploy Celeborn on Kubernetes」をご参照ください。
docker buildx コマンドには Docker 19.03 以降が必要です。詳細については、「Docker のインストール」をご参照ください。<IMAGE-REGISTRY> と <IMAGE-REPOSITORY> を、ご利用の Container Registry のアドレスとイメージ名に置き換えてください。PLATFORMS 変数を変更して、ターゲットアーキテクチャを指定します。
CELEBORN_VERSION=0.5.2 # Celeborn のバージョン。
IMAGE_REGISTRY=<IMAGE-REGISTRY> # Container Registry のアドレス (例:docker.io)。
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # イメージ名 (例:apache/celeborn)。
IMAGE_TAG=${CELEBORN_VERSION} # イメージタグ。デフォルトでは Celeborn のバージョンが使用されます。
# ディストリビューションパッケージのダウンロード。
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# パッケージの展開。
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# 作業ディレクトリへの移動。
cd apache-celeborn-${CELEBORN_VERSION}-bin
# イメージのビルドと Container Registry へのプッシュ。
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.ステップ 2:ack-celeborn コンポーネントのデプロイ
Container Service コンソール にログインします。左側のナビゲーションウィンドウで、 をクリックします。
[マーケットプレイス] ページで、[アプリカタログ] タブをクリックします。ack-celeborn を見つけてクリックします。ack-celeborn ページで、[デプロイ] をクリックします。
[デプロイ] パネルで、クラスターと名前空間を選択し、デフォルトのリリース名を維持して、[次へ] をクリックします。
[パラメーター] ステップで、パラメーターを設定し、[OK] をクリックします。以下の YAML は設定例です。ステップ 1 でビルドしたイメージのアドレスに置き換えてください。以下の表は主要なパラメーターを説明しています。完全なリストについては、ack-celeborn ページの [パラメーター] セクションをご参照ください。
image: # ステップ 1 でビルドした Celeborn イメージに置き換えます。 registry: docker.io # Container Registry のアドレス。 repository: apache/celeborn # イメージ名。 tag: 0.5.2 # イメージタグ。 celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoScheduleCeleborn クラスターが実行中であることを確認します。Pod の起動に失敗した場合は、「Pod のトラブルシューティング」をご参照ください。期待される出力:
kubectl get -n celeborn statefulsetNAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
ステップ 3:Spark コンテナイメージのビルド
Celeborn クライアント JAR と OSS へのアクセスに必要な JAR を含む Spark イメージをビルドします。その後、イメージをご利用の Container Registry リポジトリにプッシュします。
この例では Spark 3.5.3 を使用します。以下の内容で Dockerfile を作成し、<SPARK_IMAGE> をご利用の Spark ベースイメージに置き換えます。
ARG SPARK_IMAGE=<SPARK_IMAGE> # <SPARK_IMAGE> をご利用の Spark ベースイメージに置き換えます。
FROM ${SPARK_IMAGE}
# Hadoop Aliyun OSS サポートの依存関係を追加
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Celeborn クライアントの依存関係を追加
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jarsCeleborn クライアントの JAR バージョン (0.5.1) は、ステップ 2 でデプロイした Celeborn サーバーのバージョン (0.5.2) とは異なります。0.5.1 のクライアントは 0.5.2 のサーバーと互換性があります。上記のクライアント JAR バージョンを使用してください。
ステップ 4:テストデータの準備と OSS へのアップロード
PageRank テストデータセットを生成し、ご利用の OSS バケットにアップロードします。詳細な手順については、「テストデータの準備と OSS バケットへのアップロード」をご参照ください。
ステップ 5:OSS アクセス認証情報を保存する Secret の作成
OSS へのアクセス認証情報を保存する Kubernetes Secret を作成します。詳細な手順については、「OSS アクセス認証情報を保存する Secret の作成」をご参照ください。
ステップ 6:サンプル Spark ジョブのサブミット
spark-pagerank.yaml という名前のファイルを以下の内容で作成します。
次のプレースホルダーを置き換えます:
プレースホルダー | 説明 | 例 |
| ステップ 3 でビルドした Spark イメージのアドレス。 |
|
| ご利用の OSS バケットの名前。 |
|
| ご利用の OSS バケットのエンドポイント。 |
|
Celeborn の Spark 設定に関する詳細については、「Celeborn ドキュメント」をご参照ください。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # Spark イメージ。
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # テストデータセット。
- "10" # 反復回数。
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # OSS エンドポイント。
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never(オプション) ステップ 7:環境のクリーンアップ
すべてのステップが完了した後、不要になった Spark ジョブと関連リソースを削除します。
Spark ジョブの削除:
kubectl delete sparkapplication spark-pagerankSecret の削除:
kubectl delete secret spark-oss-secret