Alibaba Cloud Elastic MapReduce (E-MapReduce) 3.13.0 の Spark SQL はアダプティブ実行をサポートしています。 自動的に削減タスクの数を設定し、データスキューを解決し、実行プランを動的に最適化する場合に使用します。
解決済みの問題
- シャッフルパーティション数
現在、Spark SQL の削減ステージのタスク数は spark.sql.shuffle.partition パラメーター (デフォルト値は 200) の値によって異なります。 ジョブにこのパラメーターが指定されると、すべてのステージにおける削減タスクの数は、ジョブが実行中のときと同じ値になります。
ジョブごとに、また 1 つのジョブの削減ステージごとに、実際のデータサイズはかなり異なる場合があります。 たとえば、削減ステージで処理されるデータのサイズは 10 MB の場合もあれば 100 GB の場合もあります。 同じ値を使用してパラメーターを指定すると、実際の処理効率に大きな影響を与えます。 たとえば、10 MB のデータは 1 つのタスクだけで処理可能です。 spark.sql.shuffle.partition の値をデフォルト値の 200 に設定すると、10 MB のデータは 200 個のタスクによる処理用に分割されます。 これにより、スケジューリングのオーバーヘッドが増加し、処理効率が低下します。
Spark SQL のアダプティブ実行フレームワークは、シャッフルパーティション番号の範囲を設定することで、異なるジョブの異なるステージに合わせて範囲内の削減タスクの数を動的に調整できます。
これにより、最適化にかかるコストが大幅に削減されます (固定値を決める必要がありません)。 さらに、1 つのジョブのさまざまなステージにおける削減タスクの数を動的に調整できます。
パラメーター :属性 デフォルト値 説明 spark.sql.adaptive.enabled false アダプティブ実行を有効または無効にします。 spark.sql.adaptive.minNumPostShufflePartitions 1 削減タスクの最小数 spark.sql.adaptive.maxNumPostShufflePartitions 500 削減タスクの最大数 spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864 パーティションサイズをベースに削減タスクの数を動的に調整します。 たとえば、この値が 64 MB に設定されている場合、削減ステージの各タスクは 64 MB を超えるデータを処理します。 spark.sql.adaptive.shuffle.targetPostShuffleRowCount 20000000 パーティション内の行番号に基づいて、削減タスクの数を動的に調整します。 たとえば、値が 20000000 に設定されている場合、削減ステージの各タスクは 20,000,000 行を超えるデータを処理します。 - データスキュー
データスキューは、SQL 結合操作でよく見られる問題です。 特定のタスクが処理に巻き込むデータが多すぎるというシナリオで、ロングテールにつながります。 現在、Spark SQL は、データの歪みを最適化しません。
Spark SQL の Adaptive Execution フレームワークは、歪んだデータを自動的に検出し、実行時に最適化を実行します。
SparkSQL は、パーティション内の歪んだデータを分割し、複数のタスクを介してデータを処理してから、SQL 結合操作を介して結果を結合することにより、データの歪みを最適化します。
サポートされている結合の種類データ型 説明 Inner 歪んだデータは両方のテーブルで処理できます。 Cross 歪んだデータは両方のテーブルで処理できます。 LeftSemi 歪んだデータは左のテーブルでのみ処理できます。 LeftAnti 歪んだデータは左のテーブルでのみ処理できます。 LeftOuter 歪んだデータは左のテーブルでのみ処理できます。 RightOuter 歪んだデータは右のテーブルでのみ処理できます。 パラメーター :属性 デフォルト値 説明 spark.sql.adaptive.enabled false アダプティブ実行フレームワークを有効または無効にします。 spark.sql.adaptive.skewedJoin.enabled false 歪んだデータの処理を有効または無効にします。 spark.sql.adaptive.skewedPartitionFactor 10 パーティションが歪んだパーティションとして識別されるのは、以下のシナリオが発生する場合に限られます。 まず、パーティションのサイズがこの値 (すべてのパーティションのサイズの中央値) とspark.sql.adaptive.skewedPartitionSizeThreshold パラメーターの値より大きい場合です。 次に、パーティション内の行数がこの値 (すべてのパーティション内の行数の中央値) およびspark.sql.adaptive.skewedPartitionSizeThreshold パラメーターの値より大きい場合も歪みが検出されます。 spark.sql.adaptive.skewedPartitionSizeThreshold 67108864 歪んだパーティションのサイズしきい値 spark.sql.adaptive.skewedPartitionRowCountThreshold 10000000 歪んだパーティションの行数しきい値 spark.shuffle.statistics.verbose false このパラメーターの値が true の場合、MapStatus は歪んだデータを処理するため各パーティション内の行数に関する情報を収集します。 - 実行時の実行プランの最適化
Spark SQL の Catalyst オプティマイザは、SQL 文から物理実行プランに変換される論理プランを変換し、それらの物理実行プランを実行します。 ただし、Catalyst が作成する物理実行プランは、統計の欠如または不正確さが原因で最適ではない場合があります。 たとえば、Spark SQL は BroadcastJoin ではなく SortMergeJoinExec を選択することはできますが、BroadcastJoin はシナリオで最適なオプションです。
Spark SQL の Adaptive Execution フレームワークは、シャッフルステージでのシャッフル書き込みのサイズをベースにクエリパフォーマンスを改善するため、SortMergeJoin ではなく BroadcastJoin を使用するかどうか決定します。
パラメーター :Attribute デフォルト値 説明 spark.sql.adaptive.enabled false アダプティブ実行フレームワークを有効または無効にします。 spark.sql.adaptive.join.enabled true より良い結合戦略を実行時に決定するかどうか spark.sql.adaptiveBroadcastJoinThreshold spark.sql.autoBroadcastJoinThreshold と同等です。 ブロードキャスト結合を使用して結合クエリを最適化するかどうか決定します。
テスト
テストサンプルとして TPC-DS クエリをいくつか取ります。
- シャッフルパーティション番号
- クエリ 30
ネイティブ Spark:
- 削減タスクの数をアダプティブに調整します。
- クエリ 30
- 実行時の実行プランの最適化 (SortMergeJoin からBroadcastJoin へ)アダプティブに BroadcastJoin を使用します。