spark-adaptive-plan

Adaptive是Spark-sql重要的特性,和sql执行的性能息息相关,本文基于spark-3.0.0代码分析adaptive的原理

前言

Spark可以开启配置spark.sql.adaptive.enabled来开启SQL自动化优化。优化的核心方向是shuffle,包括shuflle小数据量分区合并,join的数据倾斜,shuffle本地化读取。

原理

我们先回顾一下spark的shuffle在底层代码的流程,触发groupby操作 -> 构建shuffleRDD -> 在DAGScheduler里转换为ShuffleMapTask执行 -> shuffleRDD的父RDD通过shuffleWriter写出数据 -> shuffleRDD读shuffle数据
然后我们再看下SQL中和shuffle相关代码,首先在EnsureRequirements里判断,如果子节点需要distribution操作(比如sort),那么就插入一个Exchange节点,而Exchange就是Shuffle的封装,可以看到ShuffleExchange节点里就是用了ShuffleRDD。
详细的shuffle实现可以参考另一篇文章shuffle实现原理
好了,现在知道了spark的shuffle流程,下面再分析一下shuffle经常遇到的问题

  • 数据倾斜
    由于map阶段key的倾斜导致不同分区的数据量相差很大,从而导致reduce阶段的task执行时间相差很大。

抱着这个问题,我们来看下Spark如何优雅的Adapitve。
核心类AdaptiveSparkPlanExec,一些封装上的或者保护性的操作就不说了,我们聚焦一下getFinalPhysicalPlan这个方法,这个是核心方法,产出最终的物理执行计划

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
if (!isFinalPlan) {
// Subqueries do not have their own execution IDs and therefore rely on the main query to
// update UI.
val executionId = Option(context.session.sparkContext.getLocalProperty(
SQLExecution.EXECUTION_ID_KEY)).map(_.toLong)
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
// 用`CreateStageResult`来封装原来的Exchange,目的是为了提供统一的metrics,因为所有的自动优化都建立在已知的metrics,否则无法优化
var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]()
val errors = new mutable.ArrayBuffer[SparkException]()
var stagesToReplace = Seq.empty[QueryStageExec]
while (!result.allChildStagesMaterialized) {
currentPhysicalPlan = result.newPlan
if (result.newStages.nonEmpty) {
stagesToReplace = result.newStages ++ stagesToReplace
executionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))

// Start materialization of all new stages and fail fast if any stages failed eagerly
// 我们知道,Spark的job才是触发计算流程的单位,但是这里需要上游的stage执行完成,拿到数据后才能对shuffle的reduce端优化
// 可以看到下面的events.take是阻塞操作,目的就是等待上游的stage完成
// 不过这里实现的不是非常完美,绕过了DAGScheduler的流程,直接在ShuffleExchangeExec调用了sparkContext.submitMapStage(shuffleDependency)
// 破坏了原有的调度流程,但也是为了捕获上个stage的计算信息来自动调优下一个stage
result.newStages.foreach { stage =>
try {
stage.materialize().onComplete { res =>
if (res.isSuccess) {
events.offer(StageSuccess(stage, res.get))
} else {
events.offer(StageFailure(stage, res.failed.get))
}
}(AdaptiveSparkPlanExec.executionContext)
} catch {
case e: Throwable =>
val ex = new SparkException(
s"Early failed query stage found: ${stage.treeString}", e)
cleanUpAndThrowException(Seq(ex), Some(stage.id))
}
}
}

// Wait on the next completed stage, which indicates new stats are available and probably
// new stages can be created. There might be other stages that finish at around the same
// time, so we process those stages too in order to reduce re-planning.
val nextMsg = events.take()
val rem = new util.ArrayList[StageMaterializationEvent]()
events.drainTo(rem)
(Seq(nextMsg) ++ rem.asScala).foreach {
case StageSuccess(stage, res) =>
stage.resultOption = Some(res)
case StageFailure(stage, ex) =>
errors.append(
new SparkException(s"Failed to materialize query stage: ${stage.treeString}.", ex))
}

// In case of errors, we cancel all running stages and throw exception.
if (errors.nonEmpty) {
cleanUpAndThrowException(errors, None)
}

// Try re-optimizing and re-planning. Adopt the new plan if its cost is equal to or less
// than that of the current plan; otherwise keep the current physical plan together with
// the current logical plan since the physical plan's logical links point to the logical
// plan it has originated from.
// Meanwhile, we keep a list of the query stages that have been created since last plan
// update, which stands for the "semantic gap" between the current logical and physical
// plans. And each time before re-planning, we replace the corresponding nodes in the
// current logical plan with logical query stages to make it semantically in sync with
// the current physical plan. Once a new plan is adopted and both logical and physical
// plans are updated, we can clear the query stage list because at this point the two plans
// are semantically and physically in sync again.
// 这里注释量很大了,再强调一下,adaptive根据优化规则(下面再分析优化规则)来对执行计划优化
// 并且,对比优化后的执行计划,如果增加了额外的shuffle,那么采用旧的执行计划
val logicalPlan = replaceWithQueryStagesInLogicalPlan(currentLogicalPlan, stagesToReplace)
val (newPhysicalPlan, newLogicalPlan) = reOptimize(logicalPlan)
val origCost = costEvaluator.evaluateCost(currentPhysicalPlan)
val newCost = costEvaluator.evaluateCost(newPhysicalPlan)
if (newCost < origCost ||
(newCost == origCost && currentPhysicalPlan != newPhysicalPlan)) {
logOnLevel(s"Plan changed from $currentPhysicalPlan to $newPhysicalPlan")
cleanUpTempTags(newPhysicalPlan)
currentPhysicalPlan = newPhysicalPlan
currentLogicalPlan = newLogicalPlan
stagesToReplace = Seq.empty[QueryStageExec]
}
// Now that some stages have finished, we can try creating new stages.
result = createQueryStages(currentPhysicalPlan)
}

// Run the final plan when there's no more unfinished stages.
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
logOnLevel(s"Final plan: $currentPhysicalPlan")
}
currentPhysicalPlan
}

核心思路是通过对shufflestage下游的动态优化来提高性能。

自适应优化规则

优化规则是优化的基石,每种规则都针对某个可以优化的场景

ReuseAdaptiveSubquery

不多说了,重复利用子查询,通过全局缓存子查询表达式

CoalesceShufflePartitions

这个是核心规则,根据算法计算出reduce端task的分区数量,而不是spark默认的spark.sql.shuffle.partitions
算法参考ShufflePartitionsUtil.coalescePartitions。算法思路是尝试将连续的相同分区id合并到一个reduce task读取,可以参考代码中的案例。

1
2
3
4
5
6
7
8
* For example, we have two shuffles with the following partition size statistics:
* - shuffle 1 (5 partitions): [100 MiB, 20 MiB, 100 MiB, 10MiB, 30 MiB]
* - shuffle 2 (5 partitions): [10 MiB, 10 MiB, 70 MiB, 5 MiB, 5 MiB]
* Assuming the target size is 128 MiB, we will have 4 coalesced partitions, which are:
* - coalesced partition 0: shuffle partition 0 (size 110 MiB)
* - coalesced partition 1: shuffle partition 1 (size 30 MiB)
* - coalesced partition 2: shuffle partition 2 (size 170 MiB)
* - coalesced partition 3: shuffle partition 3 and 4 (size 50 MiB)

  • spark.sql.execution.reuseSubquery
    开关,默认打开
  • spark.sql.adaptive.advisoryPartitionSizeInBytes
    默认64MB, 表示在shuffle后,单个reduce task的读取大小,超过此配置则增加一个新的task
  • spark.sql.adaptive.coalescePartitions.minPartitionNum
    reduce task的最小个数,默认是spark的并发数

OptimizeSkewedJoin

CoalesceShufflePartitions解决了大量小分区的问题,那么数据倾斜的问题还没有解决。一般来说数据倾斜的思路就是把倾斜的分区拆成几个子分区,然后再操作。Spark这里的优化是将SortMergeJoinExec的左右子计划用CustomShuffleReaderExec封装
值得注意的是,目前仅支持对两张表join的优化,多表join还没支持

  • spark.sql.adaptive.skewJoin.enabled
    开关,默认打开
  • spark.sql.adaptive.skewJoin.skewedPartitionFactor
    倾斜系数,如果输出分区数据量 > 中位数*这个系数,那么久把这个分区当做倾斜分区处理
  • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
    倾斜阈值,默认是256MB。只有当这个分区输出数据量大于这个阈值,那么才会被考虑当做为倾斜分区,否则就算满则倾斜系数也不会被当做倾斜分区

OptimizeLocalShuffleReader

顾名思义,这个规则是将reduce task安排到shuffle数据所在的executor上,这样在读取shuffle数据的时候可以避免网络io。
原理就是将reduce分区和map分区对应起来,1个map分区至少对应1个reduce分区,使得每个reduce分区都能充分利用shuffle本地数据。

  • spark.sql.adaptive.localShuffleReader.enabled
    开关,默认打开

总结一下

虽然adaptive在代码上只有十几个文件,但是其涉及的信息量是很大的。几个核心概念:

  • SQL物理计划的解析执行优化流程
  • Shuffle执行流程
  • Stage调度流程

这些信息几乎涵盖了Spark最核心的core和sql两大模块。
另外在优化场景上几乎没有短板,能胜任任何sql。在实际业务场景中只遇到过一次,在like非静态数据的时候,减少分区数会降低整体stage的执行速度,因为单个task的执行速度实在是很慢。

参考

Spark-3.0.0代码
SPARK-2295

ulysses wechat
订阅+