spark-sql的join实现原理

在sql世界里,join的实现是最复杂的,本文基于spark2.4.3代码分析spark sql是如何实现join的

前言

spark sql有自己独立的语法解析,优化规则,执行引擎这些不在本文中展开,可以参考另一篇博客,下面主要针对spark sql里join的实现分析,包括什么时候选择什么join实现策略。

join家庭

在物理计划层面,join目前的实现有5种实现

  1. BroadcastHashJoinExec
  2. BroadcastNestedLoopJoinExec
  3. SortMergeJoinExec
  4. ShuffledHashJoinExec
  5. CartesianProductExec
    其中1、2是基于broadcast实现的join,通常来说性能好一点,而3、4是基于shuffle实现的join,5是基于笛卡尔积的思路来实现相关join,下面我们一个个分析具体的实现思路

BroadcastHashJoinExec

这种实现方式是性能最好的,我们在开发中也应该尽可能向这个靠近。
首先BroadcastHashJoinExec是基于broadcast的实现,也就是把一张表broadcast到所有executor,另一张表就可以直接在内存遍历匹配出相同的key。其次这个实现是基于hash的,正常来说我们想找出两个数组中相同的数据需要两层循环,时间复杂度就是O(t1)*O(t2),这样的效率是不高的,而BroadcastHashJoinExec的做法就是将broadcast的数据逐行hash,存储在数组里,其实就是一个hash map,不过是自己实现的BytesToBytesMap,然后遍历另一张表,逐行hash匹配,找出符合要求的数据,这样时间复杂度就可以>近似看成是O(t1)+O(t2),性能有很大提升。
不过有利也有弊,基于hash的算法只支持基于等于条件的join,其他jon的场景不支持。
不同join场景的选择策略可以参考SparkStrategies.apply()

BroadcastNestedLoopJoinExec

这个实现也是基于broadcast的,和BroadcastHashJoinExec的区别在于这个策略是基于嵌套遍历的,看这个类名也可以看出来。虽然性能一般,但是这个策略的适用场景很广,被spark用作fallback。
总体思路就是广播一张表,然后另一张表就可以对这张表做全量遍历匹配,时间复杂度是O(t1)*O(t2)。

SortMergeJoinExec

这种实现方式是后期优化的,早期spark join的实现只有ShuffledHashJoinExec,思路上就是在shuffle上做文章,然后降低在内存中循环的时间复杂度。
首先这两种join策略都是基于shuffle,SortMergeJoinExec要求在shuffle的写的时候对数据进行排序,将两张表具有相同范围的key shuffle到同一个分区。接下来在处理过程中,就可以用顺序查找,当找匹配到一个key后,只要接着往下循环就行,不需要每次都从头循环,这样时间复杂度是O(t1)+O(t2),性能很好。但是这有可能导致一个很严重的数据倾斜问题,由于基于范围的shuffle策略也会把相同key shuffle到同一分区,而如果某个范围key的数量很大,那么该分区要处理的数据量就会很大,因此我们最好对key进行重新设计。甚至我们可以把该key的数据单独拉出来做broadcast,从而避免数据倾斜。

ShuffledHashJoinExec

这种时间方式是最古老的,spark早期就已经支持了。
区别于SortMergeJoinExec,基于hash的策略不需要再shuffle阶段对数据进行排序,具有相同key的数据自然会被分配到同一个分区。接下来在处理过程中就可以把一张表的数据存到hash map里,遍历另一张表的key去匹配。这个部分代码和BroadcastHashJoinExec是共用的。
如果想用这种测试,可以关闭配置spark.sql.join.preferSortMergeJoin,当然前提必须是等于条件的join。

总结

想要提升join的性能,可以参考以下几个思路

  1. 选择优秀的jon策略;不同的join策略对应不同的场景,我们尽可能的使用BroadcastHashJoinExec,直接对小表进行broadcast,如果两张都是大表,那么优先使用SortMergeJoinExec,当然我们应当避免两张大表的join。
  2. Join的key尽量均匀;如果key倾斜严重优先考虑broadcast,或者对key添加随机数使其均匀,再还原。
  3. 合适的分区数量;涉及到shuffle那就少不了对分区数量的优化,spark默认的shuffle分区数是200,数据量很大的场景可以提高这个值,并且可以在一定程度上减少数据倾斜的问题。

案例

1
2
3
4
5
// 假设t2是小表,那么可以这么查询
select /*+ BROADCAST(t2) */ c1, c2 from t1 join t2 on t1.c1=t2.c2

// 假设t1是从hdfs读取的表,分区数不多,但是数据量很大,那么可以这么查询
select /*+ REPARTITION(600) */ c1, c2 from t1 join t2 on t1.c1=t2.c2

题外

broadcast

broadcast是spark的一大特性,可以在很多场景提升效率,比如上面提到的join,或者共享一些配置,比如再HadoopRDD里直接广播了configuration。总之几乎任何数据都能被广播,下面分析其原理以及相关限制。
目前spark只有一个实现,TorrentBroadcast,实现思路是这样的。
首先我们需要被broadcast的数据一定在driver端的,如果是rdd数据,那么会先对其进行collect,所以数据量大的rdd慎用,会撑爆driver的内存。
数据在driver被切割成几块,默认是4MB一块,spark.broadcast.blockSize,每块都可以选择是否被压缩spark.broadcast.compress,默认是允许压缩的,压缩算法是spark.io.compression.codec,默认是lz4。
数据切割后,driver把每块数据存储在blockmanager,接下来就是通过blockmanager这个分布式存储架构来实现数据交换,可以参考另一篇博客spark分布式存储架构。但是和其他数据交换有点区别,比如shuffle数据,一份数据只存储在一个exeuctor,其下游需要通过master找到该executor的位置,再拉取数据。而broadcast的数据会在每个executor上都存储,这意味着每个executor在拉取到某个块后就可以对其他executor提供该块数据,这样数据传输效率会快很多。

另外spark还提供了一些配置来控制广播

  • spark.sql.autoBroadcastJoinThreshold
    如果数据量小于这个值,那么join的表就会被自动广播,默认是10MB。当然这个表大小的评估可能不准,目前只支持针对hive metastore提供的analyze。所以这个阈值看看就行,真需要还得靠自己显示声明。

  • spark.sql.broadcastTimeout
    这个配置比较重要,是控制时间的,默认是5分钟,也就是说只要broadcast超过5分钟就会取消,如果这个值配置成-1,就是禁止broadcast。一般来说5分钟够了,作为比较,广播几十MB数据的时间大概是秒级。

BytesToBytesMap

顺带分析一下BytesToBytesMap的实现,spark在很多实用hash的场景采用自己写的hash容器,而不是jdk自带的hashmap,BytesToBytesMap就是其中一个很经典的类。
BytesToBytesMap是一个只能append不能remove的hash map,很明显spark几乎所有的容器都是这个思路。在解决hash冲突的思路上,BytesToBytesMap采用了二次探测法。

我们知道hash map的底层是数组,通过对key的hash来确认在数组中的位置。BytesToBytesMap采用了java的jni接口,通过自己设计的协议来存储key和value。参考注释

1
2
3
4
5
6
* The key and values under the hood are stored together, in the following format:
* Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4
* Bytes 4 to 8: len(k)
* Bytes 8 to 8 + len(k): key data
* Bytes 8 + len(k) to 8 + len(k) + len(v): value data
* Bytes 8 + len(k) + len(v) to 8 + len(k) + len(v) + 8: pointer to next pair

这样设计的目的是将key-value形式一个pair,存储在内存中。并且,每次内存申请都和spark的内存模型有联系,起到全局控制内存的作用。

参考

spark2.4.3代码

ulysses wechat
订阅+