EMR Remote Shuffle Service (ESS) は、E-MapReduce (EMR) によって提供される拡張機能であり、計算エンジンのシャッフル操作を最適化します。
背景情報
現在のシャッフルソリューションには、次の欠点があります。
シャッフル書き込みタスクに大量のデータが存在する場合、データオーバーフローが発生します。これにより、書き込み増幅が発生します。
シャッフル読み取りタスクには、多数の小さなネットワークパケットが存在します。これにより、接続リセットが発生します。
シャッフル読み取りタスクには、多数の小さな I/O 要求とランダム読み取りが存在します。これにより、ディスクと CPU の負荷が高くなります。
何千ものマッパー (M) とレデューサー (N) が使用されている場合、多数の接続が生成され、ジョブの実行が困難になります。接続数は、次の式を使用して計算されます。M × N。
Spark シャッフルサービスは NodeManager で実行されます。シャッフルに関係するデータ量が非常に大きい場合、NodeManager が再起動されます。これは、YARN ベースのタスクスケジューリングの安定性に影響します。
EMR によって提供されるシャッフルベースの ESS サービスは、シャッフルソリューションを最適化できます。ESS には次の利点があります。
プルスタイルシャッフルではなくプッシュスタイルシャッフルを使用することにより、マッパーによって引き起こされるメモリプレッシャーを軽減します。
I/O 集約をサポートし、シャッフル読み取りタスクの接続数を M × N から N に減らし、ランダム読み取りをシーケンシャル読み取りに変換します。
2 レプリカメカニズムを使用して、フェッチ失敗の確率を減らします。
コンピューティングストレージ分離をサポートします。シャッフルサービスは、分離されたハードウェア環境にデプロイできます。
Spark on Kubernetes を使用する場合、ローカルディスクへの依存関係を排除します。
次の図は、ESS のアーキテクチャを示しています。
制限事項
このトピックは、EMR V4.X.X、EMR V3.39.1 より前のマイナーバージョン、または EMR V5.5.0 より前のマイナーバージョンにのみ適用されます。 EMR V3.39.1 以降のマイナーバージョン、または EMR V5.5.0 以降のマイナーバージョンで ESS を使用する場合、詳細については、「Celeborn」をご参照ください。
クラスターの作成
EMR V4.5.0 を例として使用します。次のいずれかの方法を使用して、ESS がデプロイされたクラスターを作成できます。
EMR Shuffle Service クラスターを作成します。

EMR Hadoop クラスターを作成します。

クラスターの作成方法の詳細については、「クラスターの作成」をご参照ください。
ESS の使用
Spark の使用時に ESS が必要な場合は、Spark ジョブの送信時に、次の表に示すパラメーターを追加する必要があります。パラメーター構成の詳細については、「ジョブの編集」をご参照ください。
Spark 関連のパラメーターの詳細については、「Spark Configuration」をご参照ください。
パラメーター | 説明 |
spark.shuffle.manager | 値を org.apache.spark.shuffle.ess.EssShuffleManager に設定します。 |
spark.ess.master.address | このパラメーターを <ess-master-ip>:<ess-master-port> の形式で指定します。 ここで:
|
spark.shuffle.service.enabled | 値を false に設定します。 EMR によって提供される ESS サービスを使用するには、元の外部シャッフルサービスを無効にする必要があります。 |
spark.shuffle.useOldFetchProtocol | 値を true に設定します。 ESS は元のシャッフルプロトコルと互換性があります。 |
spark.sql.adaptive.enabled | 値を false に設定します。 ESS はアダプティブ実行をサポートしていません。 |
spark.sql.adaptive.skewJoin.enabled |
パラメーター
ESS サービス構成ページで、ESS のすべてのパラメーターの設定を表示できます。
パラメーター | 説明 | デフォルト値 |
ess.push.data.replicate | 2 レプリカ機能を有効にするかどうかを指定します。有効な値:
説明 本番環境では、2 レプリカ機能を有効にすることをお勧めします。 | true |
ess.worker.flush.queue.capacity | 各ディレクトリのフラッシュバッファーの数。 説明 パフォーマンスを向上させるために、複数のディスクを構成できます。全体的な読み取りおよび書き込みスループットを向上させるには、各ディスクに最大 2 つのディレクトリを構成することをお勧めします。 各ディレクトリのフラッシュバッファーによって使用されるヒープメモリは、次の式を使用して計算されます。ess.worker.flush.buffer.size × ess.worker.flush.queue.capacity。例: | 512 |
ess.flush.timeout | ストレージ層へのデータフラッシュのタイムアウト期間。単位:秒。 | 240s |
ess.application.timeout | アプリケーションのハートビートタイムアウト期間。単位:秒。ハートビートタイムアウト期間が経過すると、アプリケーション関連のリソースがクリアされます。 | 240s |
ess.worker.flush.buffer.size | フラッシュバッファーのサイズ。単位:KB。フラッシュバッファーのサイズが上限を超えると、フラッシュがトリガーされます。 | 256k |
ess.metrics.system.enable | 監視を有効にするかどうかを指定します。有効な値:
| false |
ess_worker_offheap_memory | コアノードのオフヒープメモリのサイズ。単位:GB。 | 4g |
ess_worker_memory | コアノードのヒープメモリのサイズ。単位:GB。 | 4g |
ess_master_memory | マスターノードのヒープメモリのサイズ。単位:GB。 | 4g |