Apache Celebornは、ビッグデータ計算エンジンのシャッフルデータやスピルデータなどの中間データを処理するために使用されます。 Celebornは、ビッグデータ計算エンジンのパフォーマンス、安定性、柔軟性を効率的に向上させることができます。 RSS (Remote Shuffle Service) は、多数のデータセットをシャッフルする効率的な方法を提供します。 このトピックでは、Container Service for Kubernetes (ACK) クラスターにCelebornをデプロイする方法と、Celebornを使用してSparkジョブのRSSを有効にする方法について説明します。
メリット
Celebornを使用してRSSを有効にすると、MapReduce、Spark、Flinkなどのフレームワークに基づくビッグデータ処理に次の利点があります。
プッシュベースのシャッフル書き込み: マッパーはローカルディスクにデータを保存する必要はありません。 この機能は、クラウドで計算ストレージ分離アーキテクチャを使用するサービスに適しています。
マージベースのシャッフル書き込み: データはリデューサーではなくワーカーでマージされます。 これにより、小さなファイルや小さなデータ伝送量でのランダムな読み書きによって発生するネットワークのオーバーヘッドがなくなり、データ処理効率が向上します。
高可用性: マスターズは、Raftプロトコルに基づいてシステムの高可用性と安定性を実装します。
高いフォールトトレランス: Celebornは2つのレプリカをサポートし、フェッチ失敗の可能性を大幅に減らします。
前提条件
ack-spark-operatorコンポーネントがインストールされています。 詳細については、「手順1: ack-spark-operatorコンポーネントのインストール」をご参照ください。
kubectlクライアントがACKクラスターに接続されています。 詳細については、「クラスターのkubeconfigファイルを取得し、kubectlを使用してクラスターに接続する」をご参照ください。
OSSバケットが作成されます。 詳細は、「バケットの作成」をご参照ください。
ossutilがインストールされ、設定されます。 詳細については、「ossutilのインストール」および「ossutilの設定」をご参照ください。
ossutilコマンドの詳細については、「ossutilコマンドリファレンス」をご参照ください。
ノードプールは、次のセクションで説明する設定に基づいて作成されます。 ノードプールの作成方法の詳細については、「ノードプールの作成」をご参照ください。
クラスター環境
次の設定に基づいてクラスター環境を設定します。
マスタープロセスをceleborn-masterという名前のノードプールにデプロイします。 ノードプールは次の設定を使用します。
ノードプール名: celeborn-master.
ノード数量: 3。
ECS (Elastic Compute Service) インスタンスタイプ: g8i.2xlarge
ラベル: celeborn.apache.org/role=master
テインツ: celeborn.apache.org/role=master:NoSchedule
各マスターのデータストレージ: /mnt/celeborn_ratis (1024GB) 。
celeborn-workerという名前のノードプールにワーカープロセスをデプロイします。 ノードプールは次の設定を使用します。
ノードプール名: celeborn-worker.
ノード数量: 5。
ECSインスタンスタイプ: g8i.4xlarge
ラベル: celeborn.apache.org/role=worker.
テインツ: celeborn.apache.org/role=worker:NoSchedule.
各ワーカーのデータストレージ:
/mnt/disk1 (1024GB)
/mnt/disk2 (1024GB)
/mnt/disk3 (1024GB)
/mnt/disk4 (1024GB)
手順の概要
このセクションでは、ACKクラスターにCelebornをデプロイするための主な手順について説明します。
Celebornコンテナイメージを作成する
ビジネス要件に基づいてCelebornリリースバージョンのパッケージをダウンロードします。 次に、パッケージに基づいてコンテナイメージをビルドし、そのイメージをcontainer Registryのイメージリポジトリにプッシュします。 このイメージは、ack-celebornコンポーネントをデプロイするために使用されます。
ack-celebornコンポーネントのデプロイ
ackコンソールのMarketplaceページからACK-celebornという名前のHelmチャートをインストールし、構築したイメージを使用してCelebornを展開します。 Celebornをデプロイすると、Celebornクラスターがデプロイされます。
Sparkコンテナーイメージのビルド
Object Storage Service (OSS) にアクセスするために必要なCelebornとJARパッケージを含むSparkコンテナイメージを構築し、そのイメージをcontainer Registryのイメージリポジトリにプッシュします。
テストデータの準備とOSSへのアップロード
PageRankジョブのテストデータセットを生成し、データセットをOSSにアップロードします。
サンプルSparkジョブの実行
サンプルPageRankジョブを実行し、Celebornを使用してRSSを有効にします。
(オプション) 環境をクリアする
このトピックのすべての手順を実行した後、Sparkジョブを削除し、関連するリソースが不要になった場合はリリースします。
ステップ1: Celebornコンテナイメージを作成する
ビジネス要件に基づいて、0.5.2などのCelebornリリースバージョンのパッケージをCeleborn公式Webサイトからダウンロードします。 イメージをビルドするときは、<image-REGISTRY> と <IMAGE-REPOSITORY> を、使用するイメージリポジトリの名前とイメージ名に置き換えます。 さらに、PLATFORMS変数を変更して、イメージのアーキテクチャを指定することもできます。 詳細については、「KubernetesへのCelebornのデプロイ」をご参照ください。 docker buildxコマンドは、Docker 19.03以降でサポートされています。 Dockerバージョンの更新方法の詳細については、「Dockerのインストール」をご参照ください。
CELEBORN_VERSION=0.5.2 # The Celeborn release version.
IMAGE_REGISTRY=<IMAGE-REGISTRY> # The image repository name, such as docker.io.
IMAGE_REPOSITORY=<IMAGE-REPOSITORY> # The image name, such as apache/celeborn.
IMAGE_TAG=${CELEBORN_VERSION} # The image tag. In this example, a label that indicates the Celeborn release version is added.
# Download the package.
wget https://downloads.apache.org/celeborn/celeborn-${CELEBORN_VERSION}/apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Decompress the package.
tar -zxvf apache-celeborn-${CELEBORN_VERSION}-bin.tgz
# Switch the working directory.
cd apache-celeborn-${CELEBORN_VERSION}-bin
# Use Docker Buildkit to build the image and push the image to your image repository.
docker buildx build \
--output=type=registry \
--push \
--platform=${PLATFORMS} \
--tag=${IMAGE_REGISTRY}/${IMAGE_REPOSITORY}:${IMAGE_TAG} \
-f docker/Dockerfile \
.ステップ2: ack-celebornコンポーネントのデプロイ
ACKコンソールにログインします。 左側のナビゲーションウィンドウで、 を選択します。
[マーケットプレイス] ページで、[アプリカタログ] タブをクリックします。 ack-celebornを見つけてクリックします。 ack-celebornページで、[デプロイ] をクリックします。
[デプロイ] パネルで、クラスターと名前空間を選択し、[次へ] をクリックします。
[パラメーター] ステップでパラメーターを設定し、[OK] をクリックします。
image: # Replace the value with the address of the Celeborn image you built in Step 1. registry: docker.io # The image repository name. repository: apache/celeborn # The image name. tag: 0.5.2 # The image tag. celeborn: celeborn.client.push.stageEnd.timeout: 120s celeborn.master.ha.enabled: true celeborn.master.ha.ratis.raft.server.storage.dir: /mnt/celeborn_ratis celeborn.master.heartbeat.application.timeout: 300s celeborn.master.heartbeat.worker.timeout: 120s celeborn.master.http.port: 9098 celeborn.metrics.enabled: true celeborn.metrics.prometheus.path: /metrics/prometheus celeborn.rpc.dispatcher.numThreads: 4 celeborn.rpc.io.clientThreads: 64 celeborn.rpc.io.numConnectionsPerPeer: 2 celeborn.rpc.io.serverThreads: 64 celeborn.shuffle.chunk.size: 8m celeborn.worker.fetch.io.threads: 32 celeborn.worker.flusher.buffer.size: 256K celeborn.worker.http.port: 9096 celeborn.worker.monitor.disk.enabled: false celeborn.worker.push.io.threads: 32 celeborn.worker.storage.dirs: /mnt/disk1:disktype=SSD:capacity=1024Gi,/mnt/disk2:disktype=SSD:capacity=1024Gi,/mnt/disk3:disktype=SSD:capacity=1024Gi,/mnt/disk4:disktype=SSD:capacity=1024Gi master: replicas: 3 env: - name: CELEBORN_MASTER_MEMORY value: 28g - name: CELEBORN_MASTER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-master.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: celeborn-ratis mountPath: /mnt/celeborn_ratis resources: requests: cpu: 7 memory: 28Gi limits: cpu: 7 memory: 28Gi volumes: - name: celeborn-ratis hostPath: path: /mnt/celeborn_ratis type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: master tolerations: - key: celeborn.apache.org/role operator: Equal value: master effect: NoSchedule worker: replicas: 5 env: - name: CELEBORN_WORKER_MEMORY value: 28g - name: CELEBORN_WORKER_OFFHEAP_MEMORY value: 28g - name: CELEBORN_WORKER_JAVA_OPTS value: -XX:-PrintGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:+PrintGCDateStamps -Xloggc:gc-worker.out -Dio.netty.leakDetectionLevel=advanced - name: CELEBORN_NO_DAEMONIZE value: "1" - name: TZ value: Asia/Shanghai volumeMounts: - name: disk1 mountPath: /mnt/disk1 - name: disk2 mountPath: /mnt/disk2 - name: disk3 mountPath: /mnt/disk3 - name: disk4 mountPath: /mnt/disk4 resources: requests: cpu: 14 memory: 56Gi limits: cpu: 14 memory: 56Gi volumes: - name: disk1 hostPath: path: /mnt/disk1 type: DirectoryOrCreate - name: disk2 hostPath: path: /mnt/disk2 type: DirectoryOrCreate - name: disk3 hostPath: path: /mnt/disk3 type: DirectoryOrCreate - name: disk4 hostPath: path: /mnt/disk4 type: DirectoryOrCreate nodeSelector: celeborn.apache.org/role: worker tolerations: - key: celeborn.apache.org/role operator: Equal value: worker effect: NoSchedule下表に一部のパラメーターを示します。 パラメーター設定は、ack-celebornページの [パラメーター] セクションにあります。
以下を実行してCelebornをデプロイします。 展開プロセス中に問題が発生した場合は、ポッドのトラブルシューティングを参照して問題を解決します。
kubectl get -n celeborn statefulset期待される出力:
NAME READY AGE celeborn-master 3/3 68s celeborn-worker 5/5 68s
ステップ3: Sparkコンテナイメージを構築する
この例では、Spark 3.5.3が使用されます。 次のコンテンツに基づいてDockerfileを作成します。 次に、Dockerfileを使用してイメージを作成し、そのイメージをイメージリポジトリにプッシュします。
ARG SPARK_IMAGE=<SPARK_IMAGE> # Replace <SPARK_IMAGE> with your Spark base image.
FROM ${SPARK_IMAGE}
# Add dependency for Hadoop Aliyun OSS support
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aliyun/3.3.4/hadoop-aliyun-3.3.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/com/aliyun/oss/aliyun-sdk-oss/3.17.4/aliyun-sdk-oss-3.17.4.jar ${SPARK_HOME}/jars
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/jdom/jdom2/2.0.6.1/jdom2-2.0.6.1.jar ${SPARK_HOME}/jars
# Add dependency for Celeborn
ADD --chown=spark:spark --chmod=644 https://repo1.maven.org/maven2/org/apache/celeborn/celeborn-client-spark-3-shaded_2.12/0.5.1/celeborn-client-spark-3-shaded_2.12-0.5.1.jar ${SPARK_HOME}/jars手順4: テストデータの準備とOSSへのアップロード
テストデータを準備してOSSにアップロードする方法の詳細については、「手順1: テストデータを準備してOSSバケットにアップロードする」をご参照ください。
ステップ5: OSSへのアクセスに使用される資格情報を保存するシークレットを作成する
OSSへのアクセスに使用される資格情報を保存するためのシークレットの作成方法の詳細については、「手順3: OSSアクセス資格情報を保存するシークレットの作成」をご参照ください。
ステップ6: サンプルSparkジョブを送信する
spark-pagerank.yamlという名前のファイルを作成し、次の内容をファイルにコピーします。 ファイルはSparkApplicationを作成するために使用されます。 <SPARK_IMAGE> を、手順3: Sparkコンテナーイメージの構築で指定したイメージリポジトリのアドレスに置き換えます。 <OSS_BUCKET> および <OSS_ENDPOINT> を、使用するOSSバケットの名前およびバケットのエンドポイントに置き換えます。 SparkジョブでCelebornを設定する方法の詳細については、「Celebornドキュメント」をご参照ください。
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pagerank
namespace: default
spec:
type: Scala
mode: cluster
image: <SPARK_IMAGE> # The Spark image. Replace <SPARK_IMAGE> with the name of the Spark image.
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.3.jar
mainClass: org.apache.spark.examples.SparkPageRank
arguments:
- oss://<OSS_BUCKET>/data/pagerank_dataset.txt # The test dataset. Replace <OSS_BUCKET> with the name of the OSS bucket you use.
- "10" # The number of iterations.
sparkVersion: 3.5.3
hadoopConf:
fs.AbstractFileSystem.oss.impl: org.apache.hadoop.fs.aliyun.oss.OSS
fs.oss.impl: org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem
fs.oss.endpoint: <OSS_ENDPOINT> # The endpoint of the OSS bucket. The internal endpoint for the China (Beijing) region is oss-cn-beijing-internal.aliyuncs.com.
fs.oss.credentials.provider: com.aliyun.oss.common.auth.EnvironmentVariableCredentialsProvider
sparkConf:
spark.shuffle.manager: org.apache.spark.shuffle.celeborn.SparkShuffleManager
spark.serializer: org.apache.spark.serializer.KryoSerializer
spark.celeborn.master.endpoints: celeborn-master-0.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-1.celeborn-master-svc.celeborn.svc.cluster.local,celeborn-master-2.celeborn-master-svc.celeborn.svc.cluster.local
spark.celeborn.client.spark.shuffle.writer: hash
spark.celeborn.client.push.replicate.enabled: "false"
spark.sql.adaptive.localShuffleReader.enabled: "false"
spark.sql.adaptive.enabled: "true"
spark.sql.adaptive.skewJoin.enabled: "true"
spark.shuffle.sort.io.plugin.class: org.apache.spark.shuffle.celeborn.CelebornShuffleDataIO
spark.dynamicAllocation.shuffleTracking.enabled: "false"
spark.executor.userClassPathFirst: "false"
driver:
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
envFrom:
- secretRef:
name: spark-oss-secret
executor:
instances: 2
cores: 1
coreLimit: "2"
memory: 8g
envFrom:
- secretRef:
name: spark-oss-secret
restartPolicy:
type: Never(オプション) ステップ7: 環境をクリアする
このトピックのすべての手順を実行した後、次のコマンドを実行してSparkジョブを削除し、関連するリソースが不要になった場合は関連するリソースを解放します。
次のコマンドを実行して、Sparkジョブを削除します。
kubectl delete sparkapplication spark-pagerank次のコマンドを実行して、シークレットを削除します。
kubectl delete secret spark-oss-secret関連ドキュメント
Sparkオペレーターを使用してSparkジョブを送信する方法の詳細については、「Sparkオペレーターを使用してSparkジョブを実行する」をご参照ください。
Spark History Serverを使用してSparkジョブに関する情報を表示する方法の詳細については、「Spark History Serverを使用してSparkジョブに関する情報を表示する」をご参照ください。
Celebornの使用方法の詳細については、「Apache Celebornドキュメント」をご参照ください。