目录

Background

Deca项目是研究生期间参加的重要科研项目,项目主要是采用去对象化的思想,减少大数据平台在运行过程中,数据的占有空间与对象的数量,从而减小内存的压力,也减小GC的压力。

实现的功能

deca主要实现的功能就是减小了大数据平台在运行任务过程中的数据在内存中的占用量以及在运行过程中对象的数量。

当前的主流分布式内存计算系统均采用高级托管语言开发,这样开发进度快,方便部署和维护。

GC是托管语言(JAVA,SCALA等)的运行时系统自主管理对象的基础,GC操作会检索当前堆中存活的对象,并释放已经死亡对象的空间。

大量数据均以对象形式存放在内存中,这对导致两个问题

  1. 内存膨胀问题

    对象形式的内存布局会存储大量引用结构和元数据(对象头),而不是直接存储数据,空间利用率较低。

  2. Full GC 问题

    内存膨胀会导致JVM更加频繁的触发full gc(检索整个JVM堆内存),而GC开销与存活对象数量成正比,导致GC时间过长。

这里引申一下gc的分类,gc分为minor gc, major gc 和 full gc。其中minor是清理年轻代,major是清理老年代,而full是清理整个堆空间。

因此在面临使用内存空间有限的情况下,必须在软件层面对内存管理进行优化。

技术和架构

问题分析

spark是开源系统中主导的数据并行计算框架,提供函数式编程模型RDD,并增加了基于shuffle的GroupBy系列运算符扩展,支持中间数据的内存缓存和基于哈希的shuffle聚合操作。

spark将数据封装在RDD中,然后通过action划分job,再通过shuffle操作划分stage,然后在jvm中运行数据。

因此spark中的内存主要分为三部分。

  1. Cache RDD到内存

    这部分内存需要一直维护,只要用户进行unpersist操作,所以这部分内存生命周期较长。

  2. shuffle操作(生命周期是一个stage)

    shuffle操作需要落磁盘,进行磁盘I/O,因此需要维护所有磁盘I/O的数据,生命周期也长。

  3. stage内部的操作

    产生大量的临时对象,属于内存中的临时对象,很快会被gc回收。

长时间存活对象会一直活在内存中,每次Full GC 要扫描的对象数量很多,计算开销很大。而且对象一直存活,会大量占用内存,频繁导致full gc。

方法设计

核心思想是减少数据对象的数量,而非数据的大小。

使用对象拆解,暴露出数据对象中的裸数据:原生字段类型;去除对象头和引用结构。

基于生命周期的内存管理:将相同/相近生命周期的一组数据对象中的裸数据存放在连续的内存块(数组)中。

数据无需访问时即可一次回收整个内存块空间。

这样GC的索引由大量的对象变为少量的容器,gc开销大大减小。

将UDT(用户定义类型)分为三类:

  1. 静态定长:原生类型及其组合,如int, long, (int, long)
  2. 动态定长:原生类型的数组及组合,如int[], (int[], long)
  3. 变长对象和递归类型对象:实例化对象长度不确定,如TreeNode(TreeNode里面有一个TreeNode引用的left,right)

前两种可以安全拆解,第三种不行。

针对以上的三种内存,其中cache RDD,当cache的数据对象可以拆解时候,可以拆解为Bytes数组依次存放在page中,同时根据page对象中对象offset可以获得对象成员变量。

而针对shuffle内存,也是放在page里面,针对shuffle阶段的排序,使用指针,避免大量数据的移动。

在shuffle 阶段存在很多变长的成员,在shuffle阶段,reduceByKey尚能拆解,因为reduce之后的value依然是定长的。但是针对groupByKey这个算子,他的操作对象是(K,combinerBuffer),combiner是变长的,group之后也是变长,是不确定的。

Spark-1.4里groupByKey在shuffle write端可以利用到堆外的内存,也就是tungsten-sort,所有的数据都会写在堆外并在堆外排序,但是shuffle-read端Spark默认还是用的HashShuffleReader,所有的聚合操作都在堆内完成,这个我们已经实现了read端的堆外版本,聚合操作运行在堆外。大致介绍下原理,这里就用到了VST拆解的原理,我们知道shuffle read端读出来的(K,C)对的基本类型,于是先实现了一个简易的map(UnsafeUnfixedWidthAggregationFlintMap),嗯名字略长。。这是一个针对系统的定制的map,也是用到了Hash原理,不过所有操作都是在堆外进行。这个map用于存储key和valueAddress,这个valueAddress是一个long型值,我们会将K对应的一组Value在堆外开辟一片疆土用于存储他们,当然每次新来value时我们会检查是否扩容,若扩容会改变这块疆土(堆外空间)的起始地址,因为涉及到内存的拷贝,所以map中的valueAddress就是这块存储区域的初始偏移地址。valueAddress指向的存储区域结构为:

如果每个partition相同的key不多,而且每个key存在大量value时,采用mapsideCombine的groupBykey是一个不错的选择。如果不存在hot key,那收益就很小。

担当的责任

主要是担任shuffle groupByKey read 阶段的内存优化,我实现了read端的堆外版本,聚合操作运行在堆外。大致介绍下原理,这里就用到了VST拆解的原理,我们知道shuffle read端读出来的(K,C)对的基本类型,于是先实现了一个简易的map(UnsafeUnfixedWidthAggregationFlintMap),嗯名字略长。。这是一个针对系统的定制的map,也是用到了Hash原理,不过所有操作都是在堆外进行。这个map用于存储key和valueAddress,这个valueAddress是一个long型值,我们会将K对应的一组Value在堆外开辟一片疆土用于存储他们,当然每次新来value时我们会检查是否扩容,若扩容会改变这块疆土(堆外空间)的起始地址,因为涉及到内存的拷贝,所以map中的valueAddress就是这块存储区域的初始偏移地址。

这样就处理了变长类型的处理,之前是只实现了reduceBykey的。

后面我们也尝试了列式存储,把之前page中的数组形式,转换为列式存储。

同时负责实验的设计,实验过程中遇到bug的解决以及gc统计分析的工作。

难点

spark是惰性执行,代码中充满着各种各样的迭代器,追踪代码时都不知道哪个迭代器被调用了。修改代码需要连环的修改多个文件。

然后就是有时候单机测试可以通过,但是分布式时候就。

收获

mark