spark-sql动态分区剪枝

spark3.0.0版本支持了sql的动态分区,本文基于代码分析实现思路和原理

前言

全称 Dynamic Prune Partition,简称DPP
核心思路就是先将join一侧作为子查询计算出来,再将其得到所有分区的值作为join另一侧表的过滤条件,从而实现对分区的动态剪枝。
举个例子来说

1
2
3
4
5
6
7
8
9
select t1.c1 from t1 join t2 on t1.c1 = t2.c1 where t2.c2 = 1;

||
\/

-- 可以理解为优化成以下查询,当然实际实现略有不同
select t1.c1 from t1 join t2 on t1.c1 = t2.c1 where t2.c2 = 1 and t1.c1 in (
select c1 from t2 where t2.c2 = 1 group by c1
)

触发条件

触发动态分区需要满足一些条件

  1. 待剪枝的表再join条件中存在分区字段,没有分区字段肯定是剪不了枝的
  2. 如果是join左边表剪枝,那么join类型必须是inner join,left semi join 或者 right join。右边剪枝则类似。这很好理解,剪枝左边表是用右边表产出的分区字段作为过滤条件。那么如果是left outer join,无论右边有没有这个分区,左边的值总是存在。因此不需要被剪枝。
  3. 另一张表需要存在至少一个过滤条件,比如 a join b on a.id = b.id where b.day=1

实现计划

我们不能简单的为了拿到分区值而引入一个额外的子查询,有可能是得不偿失的,需要通过进一步优化
看规则PlanDynamicPruningFilters,会将PartitionPruning产出的逻辑计划翻译成物理执行计划,核心代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
override def apply(plan: SparkPlan): SparkPlan = {
if (!SQLConf.get.dynamicPartitionPruningEnabled) {
return plan
}

plan transformAllExpressions {
case DynamicPruningSubquery(
value, buildPlan, buildKeys, broadcastKeyIndex, onlyInBroadcast, exprId) =>
// 这里我们需要明确,buildPlan是剪枝的另一侧表的逻辑计划,举个例子来说
// `select a.c1 from a join b on a.c1 = b.c1 where b.c2=1`
// 这条sql里,buildPlan就是`select b.c1 from b where b.c2=1`.
// 将buildPlan翻译成物理计划执行,从而取出所有的分区值
val sparkPlan = QueryExecution.createSparkPlan(
sparkSession, sparkSession.sessionState.planner, buildPlan)
// Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is
// the first to be applied (apart from `InsertAdaptiveSparkPlan`).
val canReuseExchange = SQLConf.get.exchangeReuseEnabled && buildKeys.nonEmpty &&
plan.find {
case BroadcastHashJoinExec(_, _, _, BuildLeft, _, left, _) =>
left.sameResult(sparkPlan)
case BroadcastHashJoinExec(_, _, _, BuildRight, _, _, right) =>
right.sameResult(sparkPlan)
case _ => false
}.isDefined

// 3种可能性
// 1. 可以通过广播复用来获得分区值
// 2. 没有可以复用的广播,并且不值得引入额外的子查询,那么忽略动态分区优化
// 3. 值得引入额外的子查询,那么构建一个子查询来获取分区值
if (canReuseExchange) {
// 复用已经存在的广播数据
val mode = broadcastMode(buildKeys, buildPlan)
val executedPlan = QueryExecution.prepareExecutedPlan(sparkSession, sparkPlan)
// plan a broadcast exchange of the build side of the join
val exchange = BroadcastExchangeExec(mode, executedPlan)
val name = s"dynamicpruning#${exprId.id}"
// place the broadcast adaptor for reusing the broadcast results on the probe side
val broadcastValues =
SubqueryBroadcastExec(name, broadcastKeyIndex, buildKeys, exchange)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
} else if (onlyInBroadcast) {
// onlyInBroadcast 值是由 !hasBenefit || SQLConf.get.dynamicPartitionPruningReuseBroadcastOnly 计算得到
// hasBenefit是基于CBO计算得到的,表示为了实现动态分区额外增加这个计划执行是否值得,tru表示值得,false表示不值得
// hasBenefit 计算来源参考`PartitionPruning.pruningHasBenefit()`
// it is not worthwhile to execute the query, so we fall-back to a true literal
DynamicPruningExpression(Literal.TrueLiteral)
} else {
// 没有已有的广播数据可用,但是为了动态分区额外增加一个物理计划是值得的
// 构建一个新的执行计划,可以看做是`select c1 from b where c2=1 group by c1`
// 最后将这个计划转换成子查询
// we need to apply an aggregate on the buildPlan in order to be column pruned
val alias = Alias(buildKeys(broadcastKeyIndex), buildKeys(broadcastKeyIndex).toString)()
val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
DynamicPruningExpression(expressions.InSubquery(
Seq(value), ListQuery(aggregate, childOutputs = aggregate.output)))
}
}
}

参考

spark 3.0.0代码

ulysses wechat
订阅+