Spark SQL in Alibaba Cloud E-MapReduce (EMR) V3.13.0 and later provides an adaptive execution framework. This framework can be used to dynamically adjust the number of reduce tasks, handle data skew, and optimize execution plans.

Limits

In this topic, the parameters used in the adaptive execution framework of Spark SQL are suitable only for Spark 2.X. If you use Spark 3.X, you can view the related parameters in Adaptive Query Execution.

Capabilities

The adaptive execution framework of Spark SQL has the following capabilities:
  • Dynamically adjusts the number of reduce tasks.

    The number of tasks in a reduce stage in Spark SQL depends on the value of the spark.sql.shuffle.partition parameter. The default value of this parameter is 200. If you configure this parameter for a job, the number of tasks remains the same in all reduce stages of the job.

    However, for different jobs or for different reduce stages of the same job, the data to be processed may significantly vary in size. The spark.sql.shuffle.partition parameter may have a negative impact on processing efficiency. For example, if you want to process only 10 MB of data, one reduce task is sufficient. If the spark.sql.shuffle.partition parameter is set to the default value 200, the 10 MB of data is split into 200 pieces and is processed by 200 tasks. This significantly increases scheduling overheads and reduces processing efficiency.

    The adaptive execution framework of Spark SQL allows you to configure the upper and lower limits of the number of shuffle partitions. The framework can dynamically adjust the number of tasks in the reduce stages of different jobs within the specified range.

    This way, you no longer need to set the spark.sql.shuffle.partition parameter, and the optimization costs are significantly reduced. The framework can also dynamically adjust the number of tasks in different reduce stages of the same job.

    The following table describes the related parameters.
    Parameter Default value Description
    spark.sql.adaptive.enabled false Specifies whether to enable the adaptive execution framework of Spark SQL.
    spark.sql.adaptive.minNumPostShufflePartitions 1 The minimum number of reduce tasks.
    spark.sql.adaptive.maxNumPostShufflePartitions 500 The maximum number of reduce tasks.
    spark.sql.adaptive.shuffle.targetPostShuffleInputSize 67108864 The minimum size of data that each reduce task must process. Unit: bytes. For example, if you use the default value 67108864, each reduce task processes a minimum of 64 MB of data. The system dynamically adjusts the number of reduce tasks based on this parameter.
    spark.sql.adaptive.shuffle.targetPostShuffleRowCount 20000000 The minimum number of records that each reduce task must process. For example, if you use the default value 20000000, each reduce task processes a minimum of 20,000,000 records. The system dynamically adjusts the number of reduce tasks based on this parameter.
  • Automatically handles data skew.

    Data skew is a common issue in JOIN operations. When data skew occurs, some tasks need to process excessively large amounts of data, which leads to long tails. Apache Spark SQL cannot handle data skew.

    In EMR V3.13.0 and later, when a job is running, the adaptive execution framework of Spark SQL can automatically detect skewed data and handle the issue based on the settings of the related parameters.

    The adaptive execution framework of Spark SQL splits the data that is in the skewed partition, processes the data by using multiple tasks, and then combines the results by performing SQL UNION operations.

    The following table describes how the adaptive execution framework of Spark SQL handles data skew in different JOIN operations.
    JOIN operation Description
    Inner Skewed data can be handled in both tables.
    Cross Skewed data can be handled in both tables.
    LeftSemi Skewed data can be handled only in the left table.
    LeftAnti Skewed data can be handled only in the left table.
    LeftOuter Skewed data can be handled only in the left table.
    RightOuter Skewed data can be handled only in the right table.
    The following table describes the related parameters.
    Parameter Default value Description
    spark.sql.adaptive.enabled false Specifies whether to enable the adaptive execution framework of Spark SQL.
    spark.sql.adaptive.skewedJoin.enabled false Specifies whether to enable data skew handling.
    spark.sql.adaptive.skewedPartitionFactor 10 The median size of all partitions or the median number of records in all partitions. If you use this parameter to specify the median size of all partitions, a partition is identified as a skewed partition only if the size of the partition is greater than both the value of this parameter and the value of the spark.sql.adaptive.skewedPartitionSizeThreshold parameter. If you use this parameter to specify the median number of records in all partitions, a partition is identified as a skewed partition only if the number of records in the partition is greater than both the value of this parameter and the value of the spark.sql.adaptive.skewedPartitionRowCountThreshold parameter.
    spark.sql.adaptive.skewedPartitionSizeThreshold 67108864 The size threshold of a partition. This parameter is used to determine whether a partition is skewed.
    spark.sql.adaptive.skewedPartitionRowCountThreshold 10000000 The threshold of the number of records in a partition. This parameter is used to determine whether a partition is skewed.
    spark.shuffle.statistics.verbose false If you set this parameter to true, MapStatus collects information about the number of records in each partition. This parameter is used to handle data skew.
  • Dynamically optimizes execution plans.

    The Catalyst optimizer of Spark SQL converts SQL statements into physical execution plans and runs the physical execution plans. Due to the lack or inaccuracy of statistics, a physical execution plan that is produced by Catalyst may not be optimal. For example, when a broadcast join is optimal, Spark SQL may use a sort-merge join based on conversion results.

    When Spark SQL runs a physical execution plan, the adaptive execution framework of Spark SQL dynamically determines whether to use a broadcast join instead of a sort-merge join based on the data size of the shuffle write in the shuffle stage. This way, the query efficiency is improved.

    The following table describes the related parameters.
    Parameter Default value Description
    spark.sql.adaptive.enabled false Specifies whether to enable the adaptive execution framework of Spark SQL.
    spark.sql.adaptive.join.enabled true Specifies whether to enable the dynamic optimization of execution plans.
    spark.sql.adaptiveBroadcastJoinThreshold Value of spark.sql.autoBroadcastJoinThreshold A condition that is used to determine whether to use a broadcast join.