すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:Celeborn

最終更新日:Jan 11, 2025

Celeborn は中間データを処理するサービスです。 Celeborn は、ビッグデータ コンピューティング エンジンの安定性、柔軟性、およびパフォーマンスを向上させることができます。このトピックでは、Celeborn サービスの使用方法について説明します。

背景情報

現在のシャッフル ソリューションには、次の欠点があります。

  • シャッフル書き込みタスクに大量のデータが存在する場合、データ オーバーフローが発生します。これにより、書き込みの増幅が発生します。

  • シャッフル読み取りタスクには、多数の小さなネットワーク パケットが存在します。これにより、接続がリセットされます。

  • シャッフル読み取りタスクには、多数の小さな I/O 要求とランダム読み取りが存在します。これにより、ディスクと CPU の負荷が高くなります。

  • 何千ものマッパー(M)とレデューサー(N)が使用されている場合、多数の接続が生成され、ジョブの実行が困難になります。接続数は、次の式を使用して計算されます。M × N。

  • Spark シャッフル サービスは NodeManager で実行されます。シャッフリングに関係するデータ量が非常に大きい場合、NodeManager が再起動されます。これは、YARN ベースのタスク スケジューリングの安定性に影響します。

Celeborn サービスは、シャッフル ソリューションを最適化できます。 Celeborn サービスには、次の利点があります。

  • プル型シャッフルではなくプッシュ型シャッフルを使用することにより、マッパーによって引き起こされるメモリ負荷を軽減します。

  • I/O 集約をサポートし、シャッフル読み取りタスクの接続数を M × N から N に減らし、ランダム読み取りをシーケンシャル読み取りに変換します。

  • 2 レプリカ メカニズムを使用して、フェッチ失敗の確率を減らします。

  • コンピューティングとストレージの分離をサポートします。シャッフル サービスは、分離されたハードウェア環境にデプロイできます。

  • Kubernetes 上の Spark を使用する場合、ローカル ディスクへの依存を排除します。

次の図は、Celeborn のアーキテクチャを示しています。Celeborn

前提条件

E-MapReduce(EMR)Data Lake クラスタまたはカスタム クラスタが作成され、クラスタに対して Celeborn サービスが選択されています。クラスタの作成方法の詳細については、クラスタの作成をご参照ください。

制限事項

このトピックは、次の表に示すクラスタにのみ適用されます。

クラスタ

バージョン

Data Lake クラスタ

EMR V3.45.0 以後のマイナー バージョン、および EMR V5.11.0 以後のマイナー バージョン

カスタム クラスタ

EMR V3.45.0 以後のマイナー バージョン、および EMR V5.11.0 以後のマイナー バージョン

手順

Spark パラメータ

パラメータ

説明

spark.shuffle.manager

  • Celeborn 0.4.x 以降の場合は、値を org.apache.spark.shuffle.celeborn.SparkShuffleManager に設定します。

  • Celeborn 0.3.x 以前の場合は、値を org.apache.spark.shuffle.celeborn.RssShuffleManager に設定します。

spark.serializer

値を org.apache.spark.serializer.KryoSerializer に設定します。

spark.celeborn.push.replicate.enabled

2 レプリカ機能を有効にするかどうかを指定します。有効な値:

  • true:2 レプリカ機能を有効にします。これはデフォルト値です。

  • false:2 レプリカ機能を無効にします。

spark.shuffle.service.enabled

このパラメータの値を false に変更します。

Celeborn を使用するには、外部シャッフル サービスを無効にする必要があります。 Celeborn は、Spark の動的割り当て機能の使用には影響しません。

説明
  • spark.shuffle.service.enabled パラメータを true に設定すると、Celeborn は使用されません。

  • Alibaba Cloud Spark とオープンソース Spark 3.5 は、Celeborn に対応しています。

spark.celeborn.shuffle.writer

Celeborn の書き込みモード。

  • hash:過剰に多数のパーティションを並列処理すると、大量のメモリが消費されます。これはデフォルト値です。

  • sort:過剰に多数のパーティションを並列処理すると、一定量のメモリが消費されます。パーティション処理は安定しています。

spark.celeborn.master.endpoints

このパラメータの値を <celeborn-master-ip>:<celeborn-master-port> 形式で指定します。

パラメータ:

  • <celeborn-master-ip> は、マスター ノードのパブリック IP アドレスを指定します。

  • <celeborn-master-port> は、マスター ノードのポートを指定します。値を 9097 に設定します。

高可用性クラスタを作成する場合は、すべてのマスター ノードの IP アドレスを設定することをお勧めします。

spark.sql.adaptive.enabled

Celeborn サービスの適応実行を有効にできます。ローカル シャッフル リーダーを無効にして、高いシャッフル パフォーマンスを確保できます。

spark.sql.adaptive.enabled パラメータを true に、spark.sql.adaptive.localShuffleReader.enabled パラメータを false に、spark.sql.adaptive.skewJoin.enabled パラメータを true に設定する必要があります。

spark.sql.adaptive.localShuffleReader.enabled

spark.sql.adaptive.skewJoin.enabled

ワンクリックで Spark の Celeborn サービスを有効にできます。

  • EMR V5.11.1 以降のマイナー バージョンのクラスタ、および EMR V3.45.1 以降のマイナー バージョンのクラスタ

    Spark サービス ページの [ステータス] タブの [サービス概要] セクションで、[enableceleborn] をオンにします。

  • EMR V5.11.0 および EMR V3.45.0 のクラスタ

    Spark サービス ページの [ステータス] タブの [コンポーネント] セクションで、SparkThriftServer コンポーネントを見つけ、[アクション] 列の more アイコンにポインタを移動し、[enableCeleborn] を選択します。 [enableceleborn] をクリックすると、前の表で説明されている Spark パラメータが自動的に変更され、SparkThriftServer コンポーネントが再起動されます。 spark-defaults.conf および spark-thriftserver.conf 構成ファイルも自動的に変更されます。

    • [enableceleborn] をクリックすると、クラスタ内のすべての Spark ジョブで Celeborn サービスが使用されます。

    • [disableceleborn] をクリックすると、クラスタ内のすべての Spark ジョブで Celeborn サービスが使用されません。

Celeborn パラメータ

Celeborn サービス ページの [構成] タブで、すべての Celeborn パラメータの構成を表示または変更できます。次の表にパラメータを示します。

重要

パラメータの値は、ノード グループによって異なります。

パラメータ

説明

デフォルト値

celeborn.worker.flusher.threads

ハード ディスク(HDD)またはソリッドステート ディスク(SSD)ディスクにデータが書き込まれるときの、スレッドの数。

  • HDD のデフォルト値は 1 です。

  • SSD のデフォルト値は 8 です。

CELEBORN_WORKER_OFFHEAP_MEMORY

ワーカー ノードのオフヒープ メモリのサイズ。

デフォルト値は、クラスタ設定に基づいて計算されます。

celeborn.application.heartbeat.timeout

アプリケーションのハートビート タイムアウト期間。単位:秒。ハートビート タイムアウト期間が経過すると、アプリケーション関連のリソースがクリアされます。

120s

celeborn.worker.flusher.buffer.size

フラッシュ バッファのサイズ。フラッシュ バッファのサイズが上限を超えると、フラッシュがトリガーされます。

256K

celeborn.metrics.enabled

モニタリングを有効にするかどうかを指定します。有効な値:

  • true:モニタリングを有効にします。

  • false:モニタリングを無効にします。

true

CELEBORN_WORKER_MEMORY

コア ノードのヒープ メモリのサイズ。

1g

CELEBORN_MASTER_MEMORY

マスター ノードのヒープ メモリのサイズ。

2g

CelebornMaster コンポーネントの再起動

  1. Celeborn サービス ページの [ステータス] タブで、CelebornMaster コンポーネントを見つけ、[アクション] 列の more アイコンにポインタを移動し、[restart_clean_meta] を選択します。

    説明

    クラスタが高可用性でないクラスタの場合、CelebornMaster コンポーネントの [アクション] 列の [再起動] をクリックできます。

  2. 表示されるダイアログ ボックスで、[ローリング実行] をオフにし、[実行理由] パラメータを構成して、[OK] をクリックします。

  3. 確認メッセージで、[OK] をクリックします。