subpage级别的内存分配:allocateTiny() 步骤
定位一个Subpage对象,基于一个page.可能是用已有的subpage,也可能是新创建的
如果是新创建的subpage,初始化Subpage. 初始化过程,去chunk里找一个page.把这个page按照期望的subpage大小进行划分 如:期望1K的内存 ,把8K的Page分成8份
初始化pooledByteBuf. 其实就是拿到内存信息,放入到一个bytebuf中
下面使用16B进行测试
代码片段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void allocate (PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpage<T>[] table; boolean tiny = isTiny(normCapacity); if (tiny) { if (cache.allocateTiny(this , buf, reqCapacity, normCapacity)) { return ; } tableIdx = tinyIdx(normCapacity); table = tinySubpagePools; } final PoolSubpage<T> head = table[tableIdx]; synchronized (head) { final PoolSubpage<T> s = head.next; if (s != head) { ...省略 } } allocateNormal(buf, reqCapacity, normCapacity); return ; } }
tinySubPagePools
它和MemoryRegionCache 的结构很像,也是分级.比如第一个tinySubPagePools[1]是16B专属的SubPage池
这里的tiny最终会调用allocateNormal 的方式去分配,就是之前分配page时的那段逻辑.而这里和之前有不同:
创建新的chunk后会在这个新的PoolChunk
对上上调用PoolChunk#allocate
,就是下面这段.但上次我们分配page级别的时候它会进入allocateRun()
,而这次我们想分配的是subpage
,他会进入下面的allocateSubpage()
部分
1 2 3 4 5 6 7 8 long allocate (int normCapacity) { if ((normCapacity & subpageOverflowMask) != 0 ) { return allocateRun(normCapacity); } else { return allocateSubpage(normCapacity); } }
下面就是PoolChunk#allocateSubpage()
,用于定位一个subpage
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 private long allocateSubpage (int normCapacity) { PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); synchronized (head) { int d = maxOrder; int id = allocateNode(d); if (id < 0 ) { return id; } final PoolSubpage<T>[] subpages = this .subpages; final int pageSize = this .pageSize; freeBytes -= pageSize; int subpageIdx = subpageIdx(id); PoolSubpage<T> subpage = subpages[subpageIdx]; if (subpage == null ) { subpage = new PoolSubpage<T>(head, this , id, runOffset(id), pageSize, normCapacity); subpages[subpageIdx] = subpage; } else { subpage.init(head, normCapacity); } return subpage.allocate(); } }
创建的PoolSubpage
时调用它的构造方法,其构造方法中又会调用这么一段
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 void init (PoolSubpage<T> head, int elemSize) { doNotDestroy = true ; this .elemSize = elemSize; if (elemSize != 0 ) { maxNumElems = numAvail = pageSize / elemSize; nextAvail = 0 ; bitmapLength = maxNumElems >>> 6 ; if ((maxNumElems & 63 ) != 0 ) { bitmapLength ++; } for (int i = 0 ; i < bitmapLength; i ++) { bitmap[i] = 0 ; } } addToPool(head); }
下次再分配16B大小的时候就可已找到这个划分好的page了
接上面的PoolChunk#allocateSubpage()
片段,在调用完PoolSubpage
的构造方法,完成创建初始化为subpage后会return subpage.allocate();
.通过这个方法,在刚才初始化完成的PoolSubpage
中取出一个subpage
,进行返回.
subpage.allocate()
的代码片段如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 long allocate () { final int bitmapIdx = getNextAvail(); int q = bitmapIdx >>> 6 ; int r = bitmapIdx & 63 ; assert (bitmap[q] >>> r & 1 ) == 0 ; bitmap[q] |= 1L << r; if (-- numAvail == 0 ) { removeFromPool(); } return toHandle(bitmapIdx); }
然后就会返回到allocateNormal()
.此时的handler可以算出表示chunk中第几个节点,第几个subpage.也就是这块chunk中的位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 private synchronized void allocateNormal (PooledByteBuf<T> buf, int reqCapacity, int normCapacity) { if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return ; } PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle > 0 ; c.initBuf(buf, handle, reqCapacity); qInit.add(c); }
看看它最后一步进行初始化的过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void initBuf (PooledByteBuf<T> buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0 ) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); buf.init(this , handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); } } --- private void initBufWithSubpage (PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) { buf.init( this , handle, runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF ) * subpage.elemSize, reqCapacity, subpage.elemSize, arena.parent.threadCache()); }
ByteBuf的回收 用户代码如下,释放一个ByteBuf
1 2 3 PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT; ByteBuf bytebuf = allocator.directBuffer(16 ); byteBuf.release();
无论是那种类型的ByteBuf,它都会调用AbstractReferenceCountedByteBuf#release0
进行释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private boolean release0 (int decrement) { for (;;) { int refCnt = this .refCnt; if (refCnt < decrement) { throw new IllegalReferenceCountException(refCnt, -decrement); } if (refCntUpdater.compareAndSet(this , refCnt, refCnt - decrement)) { if (refCnt == decrement) { deallocate(); return true ; } return false ; } } }
它又通过PooledByteBuf#deallocate
进行释放
1 2 3 4 5 6 7 8 9 10 11 protected final void deallocate () { if (handle >= 0 ) { final long handle = this .handle; this .handle = -1 ; memory = null ; chunk.arena.free(chunk, handle, maxLength, cache); recycle(); } }
释放步骤 :
连续的内存区段加到缓存. 无论是通过新分配空间创建的,还是在缓存中获取的.此时缓存中都缺少这段空间,所以把他加到缓存
如果加入缓存失败,标记连续的内存区段为未使用 根据时page级别还是SubPage级别,标记方式不同 page:按之前的层的逻辑图去标记 subpage:通过bitmap的方式去标记,标记为0就是未使用
ByteBuf加到对象池 之前创建时也提到过,如果以后创建时去得到就会在这上面进行初始化
1.连续的内存区段加到缓存 深入到chunk.arena.free();
中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void free (PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { int size = chunk.chunkSize(); destroyChunk(chunk); activeBytesHuge.add(-size); deallocationsHuge.increment(); } else { SizeClass sizeClass = sizeClass(normCapacity); if (cache != null && cache.add(this , chunk, handle, normCapacity, sizeClass) ) { return ; } freeChunk(chunk, handle, sizeClass); } }
看一下尝试加入缓存的部分,就是从缓存中找到专属的MemoryRegionCache并加入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 boolean add (PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) { MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass); if (cache == null ) { return false ; } return cache.add(chunk, handle); } --- public final boolean add (PoolChunk<T> chunk, long handle) { Entry<T> entry = newEntry(chunk, handle); boolean queued = queue.offer(entry); if (!queued) { entry.recycle(); } return queued; }
之前从一个MemoryRegionCache获取合适的大小的chunk部分并创建时,会把废弃的Entry
清空后放入一个对象池中,而这个newEntry()
就会尝试从对象池中获取一个复用的Entry
对象.并把这个加到缓存的queue里面
2.如果加入缓存失败,标记连续的内存区段为未使用 可能是缓存队列已经满了
就会进到free()
中的下面的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 void free (PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) { if (chunk.unpooled) { ..省略 } else { SizeClass sizeClass = sizeClass(normCapacity); if (cache != null && cache.add(this , chunk, handle, normCapacity, sizeClass)) { return ; } freeChunk(chunk, handle, sizeClass); } }
这个freeChunk()
会调用PoolChunkList#free
1 2 3 4 5 6 7 8 9 10 boolean free (PoolChunk<T> chunk, long handle) { chunk.free(handle); if (chunk.usage() < minUsage) { remove(chunk); return move0(chunk); } return true ; }
PoolChunk#free
.也就是说通过chunk释放掉一段连续内存.这里会先判断释放的是page还是subpage.
page:按之前的层的逻辑图去标记
subpage:通过bitmap的方式去标记,标记为0就是未使用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 void free (long handle) { int memoryMapIdx = memoryMapIdx(handle); int bitmapIdx = bitmapIdx(handle); if (bitmapIdx != 0 ) { PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)]; assert subpage != null && subpage.doNotDestroy; PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize); synchronized (head) { if (subpage.free(head, bitmapIdx & 0x3FFFFFFF )) { return ; } } } freeBytes += runLength(memoryMapIdx); setValue(memoryMapIdx, depth(memoryMapIdx)); updateParentsFree(memoryMapIdx); }
3.ByteBuf加到对象池 复用:申请时尽可能从对象池中申请,释放时也尽可能释放到对象池
1 2 3 4 5 6 7 8 9 protected final void deallocate () { if (handle >= 0 ) { ..省略 recycle(); } }
PooledByteBuf#deallocate
的recycle();
会进行这个过程
ByteBuf总结
bytebuf的api和分类 api:read和writeapi 分类:
堆内(数组)还是堆外(JDK的ByteBuf)
Unsafe(通过JDK的Unsafe对象对内存地址进行读写 )还是非Unsafe(直接读写)
Pooled(每次分配内存都直接申请内存)还是UnPooled(预先分配好一整块内存,从这块内存中获取一部分)
分配Pooled内存的总步骤 线程私有的PooledThreadCache获取缓存空间,去找之前使用过然后被释放了的内存 如果没有就通过chunk重新进行分配
不同规格的Pooled内存分配与释放
page级别:从chunk的层次结构中分配
subpage级别:把一个page按照subpage大小进行划分,然后通过bitmap管理
释放后加入缓存队列,方别下次分配时使用
三个问题
内存的类别有哪些 三个唯独
如何减少多线程内存分配之间的竞争 各个NioEventLoop有自己的分配空间,和ThreadLocal同理,下图: