spark内存管理

spark基于在jvm之上自己封装管理了内存资源,用于rdd的cache或内部执行,本文基于spark2.4.3代码分析spark的内存

前言

初始化最大可用内存

参考如下代码,控制spark的jvm堆内存大小的变量有3个

  • reservedMemory
    这个值是默认是,当前spark给出的是300MB*1.5向上取整也就是500MB。通过预留这部分内存,可以避免OOM,保证spark可以主动检测到内存使用瓶颈,否则等待jvm抛出OOM的异常就会很被动,这样保证了程序可以正常退出,以及做出一些响应的操作。

  • systemMemory
    这个值是jvm给的,也就是可以用于分配对象的内存上限,需要注意的是,这个值Runtime.getRuntime.maxMemoryXmx不一定相等,这个值的大小依赖于jvm的gc实现。我们知道当前的gc都是分代gc,也就是有eden,s1,s2以及old。而s2拥有的这块内存必须保持空闲,因此Runtime.getRuntime.maxMemory = Xmx - s2。但是有一个gc算法是例外,就是G1,G1的Runtime.getRuntime.maxMemoryXmx是一样大的,这是由于G1的s2是动态的,拥有的内存空间是不固定的,如果需要申请s2内存后jvm内存恰好超过了Xmx,那么G1会为此触发一次gc,因此G1的s2可以认为不占用额外空间。

  • spark内存系数
    spark.memory.fraction,这个系数目的是限制spark可用内存大小,剩下的部分留给用户。默认是0.6,如果除了调用spark接口外使用的对象很少,那么可以适当调大此系数,比如0.7。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.get(TEST_MEMORY)
val reservedMemory = conf.getLong(TEST_RESERVED_MEMORY.key,
if (conf.contains(IS_TESTING)) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or ${config.DRIVER_MEMORY.key} in Spark configuration.")
}
// SPARK-12759 Check executor memory to fail fast if memory is insufficient
if (conf.contains(config.EXECUTOR_MEMORY)) {
val executorMemory = conf.getSizeAsBytes(config.EXECUTOR_MEMORY.key)
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or ${config.EXECUTOR_MEMORY.key} in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.get(config.MEMORY_FRACTION)
(usableMemory * memoryFraction).toLong
}

task级别的内存分配

spark在每个exuector上都存在两个memory pool,一个是execution,另一个是storage,
通常来说,申请内存根据先来先得的策略,但是巍峨保证某些task不占用过多的内存,spark设置了task的上下限,分为了1/activeTaskNum1/(2*activeTaskNum),当task需要申请execution内存,但是剩余内存不满足task最小分配内存,那么task会等待,直到资源被释放后唤醒。参考方法ExecutionMemoryPool.acquireMemory()。因此当spark的executor日志里出现这种信息TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free,那么说明该executor的execution内存池不够用了,可以调节比例spark.memory.fraction,或者增大executor的总内存大小

spark内存模型

可以参考另一篇博客

参考

UnifiedMemoryManager
很友好的国外友人,分析很到位

ulysses wechat
订阅+