spark推测执行

本文分析spark的speculation相关配置,并基于源码进行解释

推测机制

spark支持task级别的推测执行,目的是解决某些task由于某种意外执行过慢,比如正好分配在压力比较大的机器上的task会执行的相对较慢。整个流程核心代码在TaskSetManager。下面先说说整个推测执行的流程。

  1. 触发;task scheduler 会定时轮询是否存在需要推测执行的task,如果有那么进入等待执行队列,可以参考代码TaskSchedulerImpl.start()。可以通过配置spark.speculation.interval来修改轮询周期,默认是100毫秒。经验上来说这个配置应该远小于task的平均执行时间
  2. 目标选取;有了触发机制,那么还需要的是推测目标,我们不可能推测所有的task,这样总体运行时间就会2,对性能毫无意义。首先我们限定只有当这一组task运行成功比例>=0.75才开始推测执行,这个比例可以配置spark.speculation.quantile,具体公
    式为`math.max((speculationQuantile
    numTasks).floor.toInt, 1),可以看到最小是1,也就是说当一个stage只有一个task的时候,也可能会开启推测执行。然后,记录在这一组中已经完成的task的执行时间中位数*1.5,可以配置spark.speculation.multiplier,并且以100毫秒作为下限,选取那些运行时间超过执行完成中位数的task开启推测执行。可以看到有两个限制条件,1是执行成功的task占总体75%,2是task已经运行了超过中位数*1.5的时间却还没完成。通过这两个条件基本可以筛选出执行效率低下的task。相关代码参考TaskSetManager.checkSpeculatableTasks()。推测执行可以减少由于意外导致整个stage执行时间变大,但是会增加一些负担,可以把spark.speculation.quantile`加大,默认0.75的数据可能还是偏多
  3. 去重;有了推测执行,那么需要快速释放一方成功后的另一方。这里TaskSetManager统一采用了attempt的概念,推测执行和失败重试其实都一样,相同task id只要有一个attempt成功了,那么就可能kill其他attempt。

补充推测机制

在spark3.0.0,speculation又增加了一个配置spark.speculation.task.duration.threshold,详见PR-26614,这个配置是为了补充当某个stage的task数量过少导致原有的推测机制失效的问题。假设原有4个task,当这4个task同时阻塞,那么原有的推测就不会执行,因为没有完成的task。这种情况通常发生在这个4个task都被分配到了通一个executor或者node,而这个executor或者node是有问题的。
增加了这个配置了,我们可以手动配置,如果task的执行时间大于这个配置时间,那么我们可以直接开启推测执行。当然除了这个配置,这个补充机制还有另一个限制。为了防止推测执行的task浪费过多的资源,可能那些执行缓慢的task是真的慢,而不是因为意>外而慢。因此,只有当numTasks <= (conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK),这个补充机制还有开始执行,这可以说是一种妥协。

小结

当task数很多,比如200,那么原有的启发式推测机制基本可以覆盖所有场景。而task数量少的场景,就可以通过配置自定义的时间来控制推测执行,可以说是一个手动的开关。
配置再整理一下:

  • spark.speculation
    总开关,默认false;true的时候会开启推测执行。

  • spark.speculation.interval
    轮询时间间隔,单位毫秒,默认100毫秒。每到过这个事件,spark会检查一次是否需要推测执行。

  • spark.speculation.quantile
    执行完成的系数,默认0.75。只有满足了执行成功的task占该stage的task数的比例,spark会考虑开启推测执行。

  • spark.speculation.multiplier
    执行时间系数,默认是1.5;当没正在执行的task执行时间大于已经完成的task执行时间中位数*此系数,spark会对正在执行的task开启推测执行。

  • spark.speculation.task.duration.threshold
    时间阈值,单位毫秒,默认无;当配置了此阈值后,若某个stage的task数小于单个executor的槽点,并且task执行时间长度超过此配置,那么spark会开启推测执行。

ulysses wechat
订阅+