spark-shuffle实现原理

shuffle是分布式计算引擎的一个核心流程,遇到join,reduce等操作就可能会触发shuffle,文本通过代码分析spark的shuffle机制

前言

shuffle承接着map和reduce的数据桥梁,对map而言,shuffle可以对数据合并,排序,缓存,这样效率会比在reduce端更高;对reduce而言,shuffle可以将数据重洗牌,数据倾斜就是由这个原因引起的。

设计

那怎么设计shuffle这个流程呢。从spark执行机制的角度上来看,分为executor和task

spark的shuffle框架

spark的shuffle的插件式的,我们可以通过配置spark.shuffle.manager自定义shuffle manager,默认是sort,也就是SortShuffleManager,目前spark也就这一个ShuffleManager实现,我们可以通过继承ShuffleManager来实现自己的shuffle manager。
ShuffleManager里有四个接口,register,reader,writer和stop。核心接口则是reader和writer,当前版本reader接口只有1个实现,writer接口有3个实现,每种实现分别对应不同的场景。

SortShuffleManager

介绍

可以把这个类当做是shuffle工厂类,需要解释一下的是这里的sort不是指key-value数据的key的sort,而是对数据输出分区id的sort。输出的数据会根据输出的分区进行排序,并且通过记录索引文件的方式记录每个分区在文件中的位置,毕竟下游还需要通过分区来定位数据位置,如果同一个分区不是连续的话,那拉取数据的效率将会大大下降,这也是输出数据必须按分区排序的原因。

BypassMergeSortShuffleWriter

介绍

这个类是shuffle writer的一个实现,基于每个分区(这里的分区指的是输出的分区,而不是写本身的分区)各自维护一个文件句柄的思路,直接将数据分文件写到磁盘,最后再将所有分区文件合并成一个文件并记录一个index文件。这种思路是最简单的,类似分桶操作,不算最后的合并,整个过程只用了O(n)的时间复杂度,秒杀所有的排序算法。当前这个策略的使用前提是输出分区不大于200个分区。

使用场景

  • shuffle中没有map端的聚合操作
  • 输出的分区数小于spark.shuffle.sort.bypassMergeThreshold,默认是200

shuffle 流程

  1. 将数据按key分partition,根据partitioner,比如hash
  2. 每个partition维护一个文件句柄,最后生成一个FileSegment
  3. 将所有FileSegment合并到一个文件
  4. 最后记录一个索引文件,根据输出分区的顺序记录每个FileSegment的长度

核心类

  • BypassMergeSortShuffleWriter负责整体流程控制
  • DiskBlockObjectWriter负责将数据写到文件,通过partitioner确定数据输出的分区,每个分区和都有一个独立的对象,也就是一个独立的文件句柄
  • LocalDiskShuffleMapOutputWriter将输出的所有分区文件合并成一个文件
  • IndexShuffleBlockResolver负责维护Block和文件的映射关系,记录index文件,校验index文件等

UnsafeShuffleWriter

介绍

Spark有很多Unsafe前缀的类,这些类大多数都和数据结构相关,而其底层则是用到了Java的Unsafe类,这个类可以在对象中开辟内存空间,但是索引信息需要上层维护,使用起来比较麻烦,好处是可以节省java对容器的一系列封装造成的内存浪费。为此,spark也单独封装了一个模块,unsafe模块。

使用场景

  • shuffle中没有map端的聚合操作
  • 序列化框架支持对已经序列化数据的重定位,这个意思是被序列化后的几个对象可以任意交换位置而不影响对象的数据
  • 输出的分区数小于spark.shuffle.sort.bypassMergeThreshold,默认是200

shuffle 流程

  1. 将数据按key分partition,根据partitioner,比如hash
  2. 将数据写入Unsafe数据结构,并记录索引
  3. 当数据条数超过spark.shuffle.spill.numElementsForceSpillThreshold,spill数据到磁盘,默认是Integer.MAX_VALUE,也就是尽可能不spill
  4. 判断是否需要将内存容器扩容,spark.shuffle.sort.initialBufferSize初始化的大小只有4096字节,扩容失败也会触发spill,扩容涉及到spark的内存管理机制,这里不展开了
  5. 遍历内存的数据,排序后写入磁盘,注意排序后的分区id是连续的,因此当一条数据的partition和上一条不一样时,意味着上一个分区的数据已经写完了,此时生成一个Segment
  6. 如果有spill文件,那么还需要做最后的合并,将所有的spill数据和最后flush的数据合并成一个文件,这个合并就比较简单了,根据每个文件记录的分区索引合并即可
  7. 最后记录一个索引文件
  • 在步骤5的内存排序中spark给出了两种选择RadixSort,TimSort,通过配置spark.shuffle.sort.useRadixSort选择,默认是用RadixSort

核心类

  • ShuffleInMemorySorter负责在内存中排序
  • ShuffleExternalSorter负责排序
  • IndexShuffleBlockResolver负责维护Block和文件的映射关系,记录index文件,校验index文件等

SortShuffleWriter

介绍

这个shuffle write实现类是用于兜底的,支持所有的shuffle场景,包括map端的聚合,排序等操作。但换个角度说,通用意味着效率不高。由于涉及到的操作比较多,相对前两种shuffle write比较复杂

shuffle 流程

总体流程来看,优先执行合并操作,最后在生成文件的时候再做排序,过程中穿插着spill操作。

  1. 根据是否需要进行聚合操作选择不同的容器,聚合用map数据结构,非聚合用数组数据结构,这也比较好理解
  2. 将数据按key分partition,根据partitioner,比如hash
  3. 数据写入相应容器,需要聚合的key通过聚合处理后再更新到容器
  4. 判断是否需要spill,执行spill
  5. 将内存中和spill的数据合并写入最后产生的文件,并且这个过程支持排序
  6. 最后记录一个索引文件

核心类

  • SortShuffleWriter入口类
  • ExternalSorter负责操作数据,串联整个流程
  • PartitionedAppendOnlyMap聚合时需要的容器
  • PartitionedPairBuffer非聚合需要的容器
  • IndexShuffleBlockResolver负责维护Block和文件的映射关系,记录index文件,校验index文件等

shuffle read

相比写,读更简单一点,目前也只有一种实现,BlockStoreShuffleReader,所有的寻址以及拉数据都基于其分布式存储BlockManager

MapOutputTracker

说到shuffle就不得不提到MapOutputTracker,用于跟踪shuffle的block信息。MapOutputTrackerBlockManager的架构类似,也采用了主从,在driver端是master,在executor是worker。当有一个ShuffleMapTask执行成功,driver就会把这个task的shuffleId,partitionId,BlockManagerId注册起来。跟踪shuffle信息是为了串联起下游需要读取shuffle数据的task,其中MapOutputTracker.getPreferredLocationsForShuffle()这个方法返回一组更适合的executor地址,也就是尽可能让shuffle读的task和shuffle数据在同一个executor,减少网络io。这个功能默认是开启的spark.shuffle.reduceLocality.enabled,但是必须满足下面3个条件。

  1. map shuffle分区数必须小于1000
  2. 读取shuffle数据的reduce分区数也小于1000
  3. 该BlockManager占有的数据量超过该reduce task所需要拉取的总shuffle数据量的0.2,如果这个系数太小会导致返回过多的BlockManager,太大会返回过少或者没有适合的BlockManager
    注意这三个参数不能通过配置修改,可以认为是spark的经验参数。

总结

虽然spark提供了3种shuffle write的实现方式,但是这3种实现方式最后输出的数据结构是一致的。首先他们都是以输出分区作为粒度,也就是说,每个分区各自拥有一个write实例对象,互不影响,保证了高并发。每个分区最终shuffle写完的文件数也是一致的,一个数据文件和一个索引文件,索引文件里记录了输出分区在数据文件中的顺序和长度。这里举个例子,假设在shuffle前我们有分区20个,shuffle后我们有分区200个,那么在shuffle写的时候,20个分区中,每个分区都要输出200个分区中的某些数据,极端情况下某些分区数据量为0。输出包含200个分区的数据文件数据结构是根据分区排序的,从0开始,一直到199,并且索引文件依次记录每个分区的数据长度,这样就能通过索引文件快速定位到某个分区在数据文件中的位置,方便下游task来读取shuffle数据。

题外

RadixSort

基数排序,又称为桶排序,是一种特殊的排序方法,其思路在于将待排序的数字对每一位排序,先排个位数,再十位,再百位以此类推。排序基于10个有序的桶,对应0-9,当前位的数字放入对应桶里,用桶的顺序表示数字的顺序。从其原理可以看出,影响其排序性能在于数字大小和数量,假设有m个n位数进行排序,那么时间复杂度为:O(m*n),而n不会很大,因此这种排序算法有很好的排序性能。spark shuffle排序的场景就是基于分区id排序,而分区id都是小数字,非常适合用桶排序。要是缺点,基数排序唯一的缺点就是需要另外维护数字和桶之间的映射关系,增加对内存使用。

TimSort

TimSort是由Tim创建的排序方法,其思想是找出已经有序的子序列,然后直接利用这部分序列,避免冗余排序。如果某一点序列是有序的,那么可以用二分插入来对下一个数据排序。现在TimSort是一种优化排序算法,在JDK7的时候引入到了Arrays.sort(),并且作为默认的排序算法。

参考

spark v2.4.3

ulysses wechat
订阅+