すべてのプロダクト
Search
ドキュメントセンター

E-MapReduce:ESS (既存ユーザーのみ利用可能)

最終更新日:Apr 07, 2025

EMR Remote Shuffle Service (ESS) は、E-MapReduce (EMR) によって提供される拡張機能であり、計算エンジンのシャッフル操作を最適化します。

背景情報

現在のシャッフルソリューションには、次の欠点があります。

  • シャッフル書き込みタスクに大量のデータが存在する場合、データオーバーフローが発生します。これにより、書き込み増幅が発生します。

  • シャッフル読み取りタスクには、多数の小さなネットワークパケットが存在します。これにより、接続リセットが発生します。

  • シャッフル読み取りタスクには、多数の小さな I/O 要求とランダム読み取りが存在します。これにより、ディスクと CPU の負荷が高くなります。

  • 何千ものマッパー (M) とレデューサー (N) が使用されている場合、多数の接続が生成され、ジョブの実行が困難になります。接続数は、次の式を使用して計算されます。M × N。

  • Spark シャッフルサービスは NodeManager で実行されます。シャッフルに関係するデータ量が非常に大きい場合、NodeManager が再起動されます。これは、YARN ベースのタスクスケジューリングの安定性に影響します。

EMR によって提供されるシャッフルベースの ESS サービスは、シャッフルソリューションを最適化できます。ESS には次の利点があります。

  • プルスタイルシャッフルではなくプッシュスタイルシャッフルを使用することにより、マッパーによって引き起こされるメモリプレッシャーを軽減します。

  • I/O 集約をサポートし、シャッフル読み取りタスクの接続数を M × N から N に減らし、ランダム読み取りをシーケンシャル読み取りに変換します。

  • 2 レプリカメカニズムを使用して、フェッチ失敗の確率を減らします。

  • コンピューティングストレージ分離をサポートします。シャッフルサービスは、分離されたハードウェア環境にデプロイできます。

  • Spark on Kubernetes を使用する場合、ローカルディスクへの依存関係を排除します。

次の図は、ESS のアーキテクチャを示しています。ESS

制限事項

このトピックは、EMR V4.X.X、EMR V3.39.1 より前のマイナーバージョン、または EMR V5.5.0 より前のマイナーバージョンにのみ適用されます。 EMR V3.39.1 以降のマイナーバージョン、または EMR V5.5.0 以降のマイナーバージョンで ESS を使用する場合、詳細については、「Celeborn」をご参照ください。

クラスターの作成

EMR V4.5.0 を例として使用します。次のいずれかの方法を使用して、ESS がデプロイされたクラスターを作成できます。

  • EMR Shuffle Service クラスターを作成します。SHUFFLE

  • EMR Hadoop クラスターを作成します。ESS

クラスターの作成方法の詳細については、「クラスターの作成」をご参照ください。

ESS の使用

Spark の使用時に ESS が必要な場合は、Spark ジョブの送信時に、次の表に示すパラメーターを追加する必要があります。パラメーター構成の詳細については、「ジョブの編集」をご参照ください。

Spark 関連のパラメーターの詳細については、「Spark Configuration」をご参照ください。

パラメーター

説明

spark.shuffle.manager

値を org.apache.spark.shuffle.ess.EssShuffleManager に設定します。

spark.ess.master.address

このパラメーターを <ess-master-ip>:<ess-master-port> の形式で指定します。

ここで:

  • <ess-master-ip> は、マスターノードのパブリック IP アドレスを指定します。

  • <ess-master-port> は、マスターノードのポートを指定します。固定値 9097 に設定します。

spark.shuffle.service.enabled

値を false に設定します。

EMR によって提供される ESS サービスを使用するには、元の外部シャッフルサービスを無効にする必要があります。

spark.shuffle.useOldFetchProtocol

値を true に設定します。

ESS は元のシャッフルプロトコルと互換性があります。

spark.sql.adaptive.enabled

値を false に設定します。

ESS はアダプティブ実行をサポートしていません。

spark.sql.adaptive.skewJoin.enabled

パラメーター

ESS サービス構成ページで、ESS のすべてのパラメーターの設定を表示できます。

パラメーター

説明

デフォルト値

ess.push.data.replicate

2 レプリカ機能を有効にするかどうかを指定します。有効な値:

  • true: 2 レプリカ機能が有効になります。

  • false: 2 レプリカ機能が無効になります。

説明

本番環境では、2 レプリカ機能を有効にすることをお勧めします。

true

ess.worker.flush.queue.capacity

各ディレクトリのフラッシュバッファーの数。

説明

パフォーマンスを向上させるために、複数のディスクを構成できます。全体的な読み取りおよび書き込みスループットを向上させるには、各ディスクに最大 2 つのディレクトリを構成することをお勧めします。

各ディレクトリのフラッシュバッファーによって使用されるヒープメモリは、次の式を使用して計算されます。ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity。例:256 KB × 512 = 128 MB。各ディレクトリで提供されるスロットの数は、このパラメーターの値の半分です。たとえば、合計 28 個のディレクトリが構成されています。使用されるメモリの総量は 3.5 GB(128 MB × 28)です。スロットの総数は 7,168(512 × 28/2)です。

512

ess.flush.timeout

ストレージ層へのデータフラッシュのタイムアウト期間。単位:秒。

240s

ess.application.timeout

アプリケーションのハートビートタイムアウト期間。単位:秒。ハートビートタイムアウト期間が経過すると、アプリケーション関連のリソースがクリアされます。

240s

ess.worker.flush.buffer.size

フラッシュバッファーのサイズ。単位:KB。フラッシュバッファーのサイズが上限を超えると、フラッシュがトリガーされます。

256k

ess.metrics.system.enable

監視を有効にするかどうかを指定します。有効な値:

  • true: 監視を有効にします。

  • false: 監視を無効にします。

false

ess_worker_offheap_memory

コアノードのオフヒープメモリのサイズ。単位:GB。

4g

ess_worker_memory

コアノードのヒープメモリのサイズ。単位:GB。

4g

ess_master_memory

マスターノードのヒープメモリのサイズ。単位:GB。

4g