This topic describes the parameters used in Spark code as well as Smart Shuffle configuration parameters.
The following table describes the parameters used in Spark code.
Parameter | Default value | Description |
---|---|---|
spark.hadoop.fs.jfs.cache.oss-accessKeyId | No default value | The AccessKey ID that is used to access OSS. This parameter is optional. |
spark.hadoop.fs.jfs.cache.oss-accessKeySecret | No default value | The AccessKey secret that is used to access OSS. This parameter is optional. |
spark.hadoop.fs.jfs.cache.oss-securityToken | No default value | The STS-issued token that is used to access OSS. This parameter is optional. |
spark.hadoop.fs.jfs.cache.oss-endpoint | No default value | The endpoint that is used to access OSS. This parameter is optional. |
spark.hadoop.fs.oss.copy.simple.max.byte | 134217728 | The maximum size of a file that can be copied between buckets in OSS over a common API. Unit: bytes. |
spark.hadoop.fs.oss.multipart.split.max.byte | 67108864 | The maximum size of a file part that can be copied between buckets in OSS over a common API. Unit: bytes. |
spark.hadoop.fs.oss.impl | com.aliyun.emr.fs.oss.JindoOssFileSystem | The implementation class for the native OSS file system. |
spark.hadoop.job.runlocal | false | Specifies whether to run and debug Spark code locally if the data source is OSS. If you want to run and debug the code, set this parameter to true. Otherwise, set it to false. |
spark.logservice.fetch.interval.millis | 200 | The time interval at which a receiver retrieves data from LogHub. Unit: milliseconds. |
spark.logservice.fetch.inOrder | true | Specifies whether to consume sharded data in order. |
spark.logservice.heartbeat.interval.millis | 30000 | The heartbeat interval for the data consumption process. Unit: milliseconds. |
spark.mns.batchMsg.size | 16 | The number of MNS messages to pull at a time. The maximum value is 16. |
spark.mns.pollingWait.seconds | 30 | The waiting time for pulling messages when the MNS queue is empty. Unit: seconds. |
spark.hadoop.io.compression.codec.snappy.native | false | Specifies whether a Snappy file is in the standard Snappy format. By default, Hadoop recognizes Snappy files edited in Hadoop. |
Smart Shuffle configuration
Smart Shuffle is a shuffle implementation provided by Spark SQL in EMR V3.16.0. It is suitable for processing queries that involve large amounts of shuffle data. Smart Shuffle improves the execution efficiency of Spark SQL.
After Smart Shuffle is started, shuffle output data is not stored on local disks.
Instead, the data is transmitted to remote nodes over a network. Data in the same
partition is transmitted to the same node and stored in the same file. Take note of
the following limits:
- Smart Shuffle cannot guarantee the atomicity of task execution. When a task fails, you need to restart the entire Spark job.
- Smart Shuffle does not support External Smart Shuffle. Therefore, dynamic resource allocation is not supported.
- Speculative tasks are not supported.
- Smart Shuffle is incompatible with Adaptive Execution.
You can configure required parameters in the Spark configuration file or run the spark-submit
script to enable Smart Shuffle. The following table describes the required parameters.
Parameter | Default value | Description |
---|---|---|
spark.shuffle.manager | sort | You can configure spark.shuffle.manager=org.apache.spark.shuffle.sort.SmartShuffleManager or spark.shuffle.manager=smart to enable Smart Shuffle for a Spark session. |
spark.shuffle.smart.spill.memorySizeForceSpillThreshold | 128m | Specifies the threshold of the memory occupied by shuffle output data after Smart Shuffle is enabled. The Shuffle output data generated after this threshold is exceeded is transmitted to remote nodes. Each partition corresponds to a remote node. |
spark.shuffle.smart.transfer.blockSize | 1m | Specifies the size of a data block that is transmitted to remote nodes after Smart Shuffle is enabled. |