`
youaremoon
  • 浏览: 32486 次
  • 性别: Icon_minigender_1
  • 来自: 重庆
社区版块
存档分类
最新评论

netty5学习笔记-内存池1-PoolChunk

阅读更多

从netty 4开始,netty加入了内存池管理,采用内存池管理比普通的new ByteBuf性能提高了数(2)十倍。相信有些朋友会和我一样,对他的实现方式很感兴趣。这里把学习netty内存池的过程记录下来,与大家一起分享。

首先给大家介绍的是PoolChunk, 该类主要负责内存块的分配与回收,首先来看看两个重要的术语:

page: 可以分配的最小的内存块单位。

chunk: 一堆page的集合。

下面我们用一张图直观的看下PoolChunk是如何管理内存的:

上图中是一个默认大小的chunk, 由2048个page组成了一个chunk,一个page的大小为8192, chunk之上有11层节点,最后一层节点数与page数量相等。每次内存分配需要保证内存的连续性,这样才能简单的操作分配到的内存,因此这里构造了一颗完整的平衡二叉树,所有子节点的管理的内存也属于其父节点。如果我们想获取一个8K的内存,则只需在第11层找一个可用节点即可,而如果我们需要16K的数据,则需要在第10层找一个可用节点。如果一个节点存在一个已经被分配的子节点,则该节点不能被分配,例如我们需要16K内存,这个时候id为2048的节点已经被分配,id为2049的节点未分配,就不能直接分配1024这个节点,因为这个节点下的内存只有8K了。

通过上面这个树结构,我们可以看到每次内存分配都是8K*(2^n), 比如需要24K内存时,实际上会申请到一块32K的内存。为了分配一个大小为chunkSize/(2^k)的内存段,需要在深度为k的层从左开始查找可用节点。如想分配16K的内存,chunkSize = 16M, 则k=10, 需要从第10层找一个空闲的节点分配内存。

如何高效的从这么多page中分配到指定的内存呢。来看看下面这个图:

这个图与上图结构一致,不同的是上方的二叉树的值为当前的层数,两张图和起来用一个数组memoryMap表示,上面的图中的数字表示数组的index,下面的图中的数字表示当前节点及其子节点可以分配的层的高度。如对于id=512的节点,其深度为9,则:

1)memoryMap[512] = 9,则表示其本身到下面所有的子节点都可以被分配;

2)memoryMap[512] = val (从10到11), 则表示512节点下有子节点已经分配过,则该节点不能直接被分配,而其子节点中的第val和val以下层还存在未分配的节点;

3)memoryMap[512] = 12 (即总层数 + 1), 可分配的深度已经大于总层数, 则该节点下的所有子节点都已经被分配。

下面我们在从源码分析下PoolChunk是如何实现的,首先看看它的构造方法:

 

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {
    unpooled = false;
    this.arena = arena;
    // memory是一个容量为chunkSize的byte[](heap方式)或ByteBuffer(direct方式)
    this.memory = memory;
    // 每个page的大小,默认为8192
    this.pageSize = pageSize;
        // 13,   2 ^ 13 = 8192
   this.pageShifts = pageShifts;
    // 默认11
    this.maxOrder = maxOrder;
    // 默认 8192 << 11 = 16MB
    this.chunkSize = chunkSize;
    // 12, 当memoryMap[id] = unusable时,则表示id节点已被分配
    unusable = (byte) (maxOrder + 1);
    // 24, 2 ^ 24 = 16M
    log2ChunkSize = log2(chunkSize);
    // -8192
    subpageOverflowMask = ~(pageSize - 1);
    freeBytes = chunkSize;

    assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;
    // 2048, 最多能被分配的Subpage个数
    maxSubpageAllocs = 1 << maxOrder;

    // Generate the memory map.
    memoryMap = new byte[maxSubpageAllocs << 1];
    depthMap = new byte[memoryMap.length];
    int memoryMapIndex = 1;
    // 分配完成后,memoryMap->[0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3…]
    // depthMap->[0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3…]
    for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
      int depth = 1 << d;
      for (int p = 0; p < depth; ++ p) {
        // in each level traverse left to right and set value to the depth of subtree
        memoryMap[memoryMapIndex] = (byte) d;
        depthMap[memoryMapIndex] = (byte) d;
        memoryMapIndex ++;
      }
    }

        // subpages包含了maxSubpageAllocs(2048)个PoolSubpage, 每个subpage会从chunk中分配到自己的内存段,两个subpage不会操作相同的段,此处只是初始化一个数组,还没有实际的实例化各个元素
    subpages = newSubpageArray(maxSubpageAllocs);
  }

需要注意的是由于memoryMap表示的树中一个包含2 ^ maxOrde - 1个节点,因此memoryMap中的其中一个节点是无用的,为了方便后续的计算,这里将第一个节点作为无用的节点,这样从父节点计算左子结点只需要简单的*2即可,即left node id = parent id << 1。随着内存不断的分配和回收,memoryMap中的值也不停的更新,而depthMap中保存的值表示各个id对应的深度,是个固定值,初始化后不再变更。

下面看看如何向PoolChunk申请一段内存:

 

// normCapacity已经处理过
long allocate(int normCapacity) {
    if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
        // 大于等于pageSize时返回的是可分配normCapacity的节点的id
      return allocateRun(normCapacity);
    } else {
        // 小于pageSize时返回的是一个特殊的long型handle
        // handle = 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
        // 与上面直接返回节点id的逻辑对比可以知道,当handle<Integer.MAX_VALUE时,它表示chunk的节点id;
        // 当handle>Integer.MAX_VALUE,他分配的是一个Subpage,节点id=memoryMapIdx, 且能得到Subpage的bitmapIdx,bitmapIdx后面会讲其用处
      return allocateSubpage(normCapacity);
    }
  }

allocateRun与allocateSubpage都需要查询到一个可用的id, allocateSubpage相对allocateRun多出分配PoolSubpage的步骤,下面先看看allocateRun的实现:

 

 

private long allocateRun(int normCapacity) {
	// log2(val) -> Integer.SIZE - 1 - Integer.numberOfLeadingZeros(val)
	// 如normCapacity=8192,log2(8192)=13,d=11
        int d = maxOrder - (log2(normCapacity) - pageShifts);
        // 通过上面一行算出对应大小需要从那一层分配,得到d后,调用allocateNode来获取对应id
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }
        // runLenth=id所在深度占用的字节数
        freeBytes -= runLength(id);
        return id;
    }


allocateNode(int d)传入的参数为depth, 通过depth来搜索对应层级第一个可用的node id。下面看allocateNode的实现:

 

 

private int allocateNode(int d) {
        int id = 1;
        // 如d=11,则initial=-2048
        int initial = - (1 << d); // has last d bits = 0 and rest all = 1
        // value(id)=memoryMap[id]
        byte val = value(id);
        // 第一层的节点的值大于d,表示d层及以上都不可分配,此次分配失败
        if (val > d) { // unusable
            return -1;
        }
        // 这里从第二层开始从上往下查找, 一直找到指定层
        while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
            // 往下一层
            id <<= 1;
            val = value(id);
            // 上面对一层节点的值判断已经表示有可用内存可分配,因此发现当前节点不可分配时,
            // 直接切换到父节点的另一子节点,即id ^= 1
            if (val > d) {
                id ^= 1;
                val = value(id);
            }
        }
        byte value = value(id);
        assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                value, id & initial, d);
        // 该节点本次分配成功,将其标为不可用
        setValue(id, unusable); // mark as unusable
        // 更新父节点的值
        updateParentsAlloc(id);
        return id;
    }

上面有几个需要注意的点:1、分配的节点在d层;2、每一层都可能遇到节点被分配的情况,此时需要切换到父节点的另一个子节点继续往下查找;3、分配的节点需要标记为不可用,防止后面再被分配;4、分配一个节点后,其父节点的值会发生变更,并可能引起更上层的父节点的变更。下面我们来举个例子看看第4点(depth=10节点的子节点从2个可用变为1个可用的情况):

 

1)parent的depth=10,表示该层及所有子层可直接分配;

2)left节点被分配,其depth=12, 不能再被分配;

3)由于left节点已经被分配,此时parent节点已经不能直接分配,但还存在一个可分配节点right,此时parent的depth被设置为right的值11;

4)parent不可分配,则其以上的所有父节点的状态可能也会发生变化,此时需要递归的修改更上层parent的值,及指针上移后重复上面的操作。

 

private void updateParentsAlloc(int id) {
        while (id > 1) {
            int parentId = id >>> 1;
            byte val1 = value(id);
            byte val2 = value(id ^ 1);
            // 得到左节点和右节点较小的值赋给父节点,即两个节点只要有一个可分配,则父节点的值设为可分配的这个节点的值
            byte val = val1 < val2 ? val1 : val2;
            setValue(parentId, val);
            id = parentId;
        }
    }

 

 

到这里一个节点的分配就完成了,但是还有另一种情况,即分配的ByteBuf大小小于pageSize, 这种情况会调用allocateSubpage方法:

 

private long allocateSubpage(int normCapacity) {
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        int id = allocateNode(d);
        if (id < 0) {
            return id;
        }

        final PoolSubpage<T>[] subpages = this.subpages;
        final int pageSize = this.pageSize;

        freeBytes -= pageSize;
	// 包含数据的节点在最后一层,而最后一层的左边第一个节点index=2048,因此若id=2048则subpageIdx=0,id=2049,subpageIdx=1
	// 根据这个规律找到对应位置的PoolSubpage
        int subpageIdx = subpageIdx(id);
        PoolSubpage<T> subpage = subpages[subpageIdx];
        if (subpage == null) {
            // 如果PoolSubpage未创建则创建一个,创建时传入当前id对应的offset,pageSize,本次分配的大小normCapacity
            subpage = new PoolSubpage<T>(this, id, runOffset(id), pageSize, normCapacity);
            subpages[subpageIdx] = subpage;
        } else {
            // 已经创建则初始化数据
            subpage.<span style="color:#009900;">init</span>(normCapacity);
        }
        // 调用此方法得到一个可以操作的handle
        return subpage.<span style="color:#009900;">allocate</span>();
    }

这里可以看到,在小size的对象分配时会先分配一个PoolSubpage,最终返回一个包含该PoolSubpage信息的handle,后面的操作也是通过此handle进行操作。在看如何操作内存之前,我们先来看看分配一个节点后如何释放(比较简单,所以先讲这里),其对应的方法:

 

void free(long handle) {
	// 传入的handle即allocate得到的handle
        int memoryMapIdx = (int) handle;
        int bitmapIdx = (int) (handle >>> Integer.SIZE);

        if (bitmapIdx != 0) { // free a subpage
            // !=0表示分配的是一个subpage
            PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
            assert subpage != null && subpage.doNotDestroy;
            if (subpage.free(bitmapIdx & 0x3FFFFFFF)) {//subpage释放方法后面再讲
                return;
            }
        }
        freeBytes += runLength(memoryMapIdx);
        // 将节点的值改为可用,及其depth
        setValue(memoryMapIdx, depth(memoryMapIdx));
        // 修改其父节点的值
        updateParentsFree(memoryMapIdx);
    }

这里会将节点的值改为可用,完成后该节点就可以再次被分配了,此时父节点的值还未更新,可能会导致分配节点时无法访问到此节点,因此还需要同时改变其父节点的值,

 

 

 private void updateParentsFree(int id) {
        int logChild = depth(id) + 1;
        while (id > 1) {
            int parentId = id >>> 1;
            byte val1 = value(id);
            byte val2 = value(id ^ 1);
            logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up

            if (val1 == logChild && val2 == logChild) {
            	//当两个子节点都可分配时,该节点变回自己所在层的depth,表示该节点也可被分配
                setValue(parentId, (byte) (logChild - 1));
            } else {
            	// 否则与上面的updateParentsAlloc逻辑相同
                byte val = val1 < val2 ? val1 : val2;
                setValue(parentId, val);
            }

            id = parentId;
        }
    }

PoolChunk本身主要是负责节点的分配与释放,因此节点的分配与释放都了解了对这个类的了解就已经差不多了。但这里还有几个方法没讲到,这几个方法是干什么用的呢,下面我们来看看:

    void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
        int memoryMapIdx = (int) handle;
        int bitmapIdx = (int) (handle >>> Integer.SIZE);
        if (bitmapIdx == 0) {
            // 到这里表示分配的是>=pageSize的数据
           byte val = value(memoryMapIdx);
            assert val == unusable : String.valueOf(val);
           buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx));
        } else {
            // 到这里表示分配的是<pageSize的数据
           initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
        }
    }

    void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
 initBufWithSubpage(buf, handle, (int) (handle >>> Integer.SIZE), reqCapacity);
 }

 private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
 assert bitmapIdx != 0;

 int memoryMapIdx = (int) handle;

 PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
 assert subpage.doNotDestroy;
 assert reqCapacity <= subpage.elemSize;

 buf.init(
 this, handle,
 runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize);
 }

通过allocate拿到了handle,相当于得到了一把钥匙,现在就可以拿这把钥匙开启后面的操作了。首先我们拿handle+buf+reqCapacity几个参数来进行实际的分配,前面讲过handle有两种含义,1、handle<Integer.MAX_VALUE, 表示一个node id; 2、handle>Integer.MAX_VALUE, 则里面包含node id + 对应的subpage的bitmapIdx。最终传递给PooledByteBuf的信息包括,PoolChunk中的memory: 完整的byte数组或ByteBuf;handle:表示node id的数值,释放内存段的时候使用这个参数; offset:该PooledByteBuf可操作memory的第一个位置; length:该PooledByteBuf本次申请的长度;maxLength:该PooledByteBuf最大可用的长度。PooledByteBuf通过这些参数完成自身的初始化后,就可以开始实际的读写了,它的可读写区域就是memory数组的offset 到 offset + length, 如果写入的数据超过length, 可以扩容至maxLength, 即可读写的区域变为offset 到 offset + maxLength, 如果超过maxLength, 则需要重新申请数据块了,这个后面再说。

到这里PoolChunk的功能基本上已经讲完,下面来回顾下PoolChunk的简单使用过程(真实的使用比较复杂,后面再讲):

1) 初始化PoolChunk,初始时传入分配好的内存块memory(heap或direct),通过maxOrder构造完整的平衡二叉树,并且将对应值存入memoryMap;
2) 应用获取一个大小为size0的内存段,通过调用PoolChunk.allocate方法获取到对应的二叉树节点id并转换为handle,
   如果size0 >= 8192, handle=id,如果size0 < 8192, handle = 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
   获取完成后更新memoryMap, 保证该节点不会重复分配,同时其父节点的可分配状况也级联的改变。
3) 通过上一步获取到的handle来初始化应用的ByteBuf,调用的方法为PoolChunk.initBuf,ByteBuf会接收memory, handle, offset, length, maxLengh;
4) 应用对ByteBuf进行写入读取等操作(可能会触发扩容等操作,后面的文章会深入讲;
5) 应用使用完ByteBuf后释放它,ByteBuf释放时调用PoolChunk.free,传入的参数为handle,PoolChunk将handle对应的节点置为可分配,同时改变其父节点的状态。
   该节点可以再次分配了(其父节点也有可能被分配了)。
6)下次应用获取内存段时再次进入第二步。

需要注意的是PoolChunk提供了内存池的功能,然后它的所有方法都是线程不安全的,因此需要应用保证其可以被安全的访问,否则会导致节点被重复分配等问题;ByteBuf的设计使内存不使用后不需要手动的将数据置0,直接重置节点状态即可;申请小内存后会通过PoolSubpage进行管理,为什么这么做,下一篇再慢慢讲,回家先。

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics