RSS(EMR Remote Shuffle Service)是E-MapReduce(简称EMR)为了提升Shuffle稳定性和性能推出的扩展组件。

背景信息

目前Shuffle方案缺点如下:
  • Shuffle Write在大数据量场景下会溢出,导致写放大。
  • Shuffle Read过程中存在大量的网络小包导致的Connection reset问题。
  • Shuffle Read过程中存在大量小数据量的IO请求和随机读,对磁盘和CPU造成高负载。
  • 对于M*N次的连接数,在M和N数千的规模下,作业基本无法完成。
  • NodeManager和Spark Shuffle Service是同一进程,当Shuffle的数据量特别大时,通常会导致NodeManager重启,从而影响YARN调度的稳定性。
EMR推出的基于Shuffle的RSS服务,可以优化目前Shuffle方案的问题。RSS优势如下:
  • 使用Push-Style Shuffle代替Pull-Style,减少Mapper的内存压力。
  • 支持IO聚合,Shuffle Read的连接数从M*N降到N,同时更改随机读为顺序读。
  • 支持两副本机制,降低Fetch Fail概率。
  • 支持计算与存储分离架构,可以部署Shuffle Service至特殊硬件环境中,与计算集群分离。
  • 解决Spark on Kubernetes时对本地磁盘的依赖。
RSS设计架构图如下。RSS

使用限制

此文档仅适用于EMR-3.39.1及后续版本,EMR-5.5.0及后续版本。

创建集群

以EMR-5.5.0版本为例,您可以通过以下两种方式创建RSS集群:
  • 创建独立的Shuffle Service集群。RSS-01
  • 创建包含RSS组件的Hadoop集群。RSS_02

集群创建详情请参见创建集群

使用RSS

Spark使用RSS时,需在提交Spark作业时添加以下配置项,配置详情请参见作业编辑

Spark相关的参数,请参见Spark Configuration

参数 描述
spark.shuffle.manager 固定值org.apache.spark.shuffle.RSS.RSSShuffleManager
spark.serializer 固定值org.apache.spark.serializer.KryoSerializer
spark.rss.master.address 填写格式<rss-master-ip>:<rss-master-port>
涉及参数如下:
  • <rss-master-ip>:Master节点的公网IP地址。
  • <rss-master-port>:固定值9097。
spark.shuffle.service.enabled 默认值为false。

使用EMR的Remote Shuffle Service时需要关闭原有的External Shuffle Service。

spark.rss.shuffle.writer.mode RSS的wirter支持的模式:
  • hash(默认值):在Partition并发度过大的情况下会使用较多的内存。
  • sort:使用固定大小内存,在Partition并发度很大的情况下,能够稳定工作。
spark.rss.push.data.replicate 是否开启两副本。取值如下:
  • true(默认值):开启两副本。
  • false:不开启两副本。
spark.sql.adaptive.enabled 默认值为false。
EMR的Remote Shuffle Service暂不支持Adaptive Execution。
说明 这些参数仅适用于EMR-3.42.0之前的版本和EMR-5.8.0之前的版本。
spark.sql.adaptive.localShuffleReader.enabled
spark.sql.adaptive.skewJoin.enabled

配置项说明

您可以在RSS服务配置页面,查看RSS所有的配置项。
参数 描述 默认值
rss.worker.flush.queue.capacity 每个目录的Flush buffer数量。
说明 为了提升性能,您可以配置多块磁盘。为了提升整体的读写吞吐量,建议一块磁盘不多于2个目录。

每个目录的Flush buffer所消耗堆内的内存为RSS.worker.flush.buffer.size * RSS.worker.flush.queue.capacity,即256 KB * 512 = 128 MB。每个目录提供的槽位(slots)数量是该参数的一半。例如,总共28个目录,则整体内存消耗是128 MB * 28 = 3.5 GB,整体的slots数量是512 * 28 / 2 = 7168

512
rss.flush.timeout Flush到存储层的超时时间。 240s
rss.application.timeout Application心跳超时时间,超时会清理Application相关资源。 240s
rss.worker.flush.buffer.size Flush buffer大小,超过最大值会触发刷盘。 256k
rss.metrics.system.enable 是否打开监控。取值如下:
  • true:打开监控。
  • false:不打开监控。
false
rss_worker_offheap_memory Worker堆外内存大小。 4g
rss_worker_memory Worker堆内内存大小。 4g
rss_master_memory Master堆内内存大小。 4g