spark的分布式存储

spark虽然是一个分布式计算引擎,但是也对数据的临时存储,缓存等操作必不可能,因此分布式存储框架十分重要。本文接口spark2.4.3版本的代码分析分布式存储原理。

前言

分布式存储的架构一般都是采用master-slaver,常见的kafka,hbase。通过对slaver的故障切换就保证了集群的ha,再结合副本机制保证了分区容错。spark的分布式存储也是这个思路。
在spark的世界里,需要被缓存或者存储的数据被称为block,block的管理者被称为BlockManager

block

block是spark的最小存储单位,但是这个大小是不固定的,比如一个rdd的数据需要cache到磁盘,那么这个rdd的每个分区的数据就会被当做是一个block存储。
block之间通过唯一id来索引,也就是BlockId,不同的block场景,BlockId定义规则也不同。rdd的block id是由rddId+splitIndex组成,我们知道rddId是全局唯一的自增id,因此得到的block id也是全局唯一的id。

block manager

在从逻辑上讲,block的存储结果也是一个key-value存储,key是block id,在文件名中体现,并且充当一级索引,value是数据,记录在文件里。
那么spark是如果通过block manager来控制数据的生命周期的呢。
首先block manager也采用了master-slave架构,这在分布式场景中是很常见的,master和slaver依附在driver和executor进程运行,每当有新的block被写入或者移除,blockmanager通过rpc接口通知master,master(BlockManagerMasterEndpoint)存在于driver,用于统筹全局。有些对master的情况是需要通知到其他slave的,比如移除block,block由于有副本机制,一个block id的数据可能存在于多个executor,那么就需要依靠master来通知其他block slaver,从而移除该block id的所有副本。
下面对过读和写的两个流程分析block manager的实现流程。在此之前,先让我们明确一下block manager的接口调用流程,既包括进程内的接口,也包括rpc接口。
BlockManager -> BlockManagerMaster -> DriverEndPoint -> BlockManagerMasterEndpoint -> BlockManagerSlaveEndpoint

  1. 根据block id从blockmanager中获取数据,如果数据在本地,那么直接返回,如果在其他executor节点,那么进入2
  2. 向blockManagerMaster发送请求,获取该block id所在的executor
  3. 优先读取存在于同一台集群的但在不同executor的数据,如果没有,那么只能跨机器读取block数据
  1. 如果需要些的副本数据大于1,那么需要将多出的副本写到其他机器(先获从master获取其他blockmanager的信息)
  2. 根据存储等级判断是写内存还是写磁盘,假设StorageLevel为MEMORY_AND_DISK,spark优先将数据存储在内存,如果内存空间不足则将剩余数据写到磁盘
  3. 如果需要则将此次block信息报告给master(目前看来都是需要的,只是预留接口)

对于数据在内存中的数据结构,这里再多说一点,毕竟这是非常重要的。
首先,为了能更好的跟踪数据在内存中占用的空间变化,管理内存的生命周期,spark抽象出了SizeTracker接口,我们拿其中的一个实现SizeTrackingVector来说
spark将block数据存储在一个数组中,数量超过数组长度则将数组翻倍扩容。每当有数据写进内存数组中,spark就会判断是否需要重新评估占用内存大小,spark采用抽样的方式,每隔1,2,4,8 …… spark就会重新评估一次数组在内存中的大小,评估方法参考SizeEstimator.estimate(),这里就不展开了。
有了这个底层的数据大小判断后。spark默认每过spark.storage.unrollMemoryCheckPeriod条数据检查一次内存大小,并且以评估后内存大小的spark.storage.unrollMemoryGrowthFactor系数申请内存资源。如果当前内存中可用大小不满足申请内存大小,那么spark会把剩下的block数据写到磁盘。也就是说,spark会把数据的一部分存储在内存,一部分存储在磁盘。

block锁

由于是多个线程场景,因此在过程中必然会涉及到对block的并发读写或者删除。spark采用了读写锁+自旋锁的思路来做控制,底层锁采用了对象的waite和notify机制实现。实现比较简单,参考类BlockInfoManager

总结

spark通过自身的rpc框架实现了一套简单的分布式存储。目前只支持一级索引,也就是通过BlockId的key-value查询,这也是基于spark的场景选择的,不需要对单条数据点查,最小粒度支持到rdd的partition级别已经足够。

ulysses wechat
订阅+