spark杂谈

dynamic allocation

spark如果希望向mr一样可以动态分配资源,那么可以配置spark.dynamicAllocation.enabled,需要注意的是,这个动态不是说你起了一个spark context,然后可以动态伸缩executor数量,而是你执行一条sql,当这个sql有很多个stage的时候,不同stage使用的executor可以被动态伸缩。这个策略和hive是一致的。
但是这有个前提,spark必须开启额外的shuffle服务,也就是配置spark.shuffle.service.enabled,如果是基于yarn,那么需要在yarn-site.xml里加入配置,并在yarn的classpath里加入spark的shuffle jar。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>
这意味着每台nodemanager都会启动一个shuffle service,这个服务用于提供shuffle数据给spark的reduce task。
这点和MapReduce是有区别的,区别在于,spark对于executor的移除是基于空闲判断而非MR的并发控制,当某个executor在一段时间内都内有task执行,那么这个executor被判断为空闲状态,将被移除。这会导致一个问题,可能这个executor之前刚做完shuffle write,那么当这executor被移除后,这部分数据就丢了。所以,spark需要开启shuffle service,通过shuffle service来帮助提供shuffle的数据。

另外还有相似配置`spark.shuffle.service.fetch.rdd.enabled`,可以让shuffle service提供读取被persisted的rdd数据。


# DataSource v2
spark目前还在推v2的接口,为了更好把计算下推到各类存储介质,spark丰富了其v2的接口


做计算引擎,很重要的一点就是拥抱上游或者下游的存储介质,充分利用其它组件的特性,才能使得计算更快效率更高。
就拿parquet文件来说,当spark查询parquet文件里的数据,那么会把过滤条件下推到parquet接口,利用parquet自带的filter进行数据过滤,这样效率就大大高于把数据取出来,在内存遍历后过滤。


# vector
spark有个钨丝计划,其中有一项内容就是内存向量化,也就是把数据按列缓存在内存里。
参考类型`OnHeapColumnVector`,spark给不同的字段类型定制了不同的数组来存储

// This is faster than a boolean array and we optimize this over memory footprint.
private byte[] nulls;

// Array for each type. Only 1 is populated for any type.
private byte[] byteData;
private short[] shortData;
private int[] intData;
private long[] longData;
private float[] floatData;
private double[] doubleData;

// Only set if type is Array or Map.
private int[] arrayLengths;
private int[] arrayOffsets;

1
2
3
4
5
6
7
8
9
10
11
12
13
14


# unsafe row
除了列式存储,spark在最开始的时候就支持了unsafe的存储方式,也就是通过java的jni接口,直接将数据写到原生的内存中,而不是基于java封装的容器存储。
参考`UnsafeRow`


# codegen
java 通过`Janino`框架来实现对代码的实时编译,将java代码编译成字节码


整个stage通过自动生成代码来执行

查看一个sql的codegen代码

EXPLAIN CODEGEN select sum(siz) from table
`

ulysses wechat
订阅+