内存的类别有哪些
如何减少多线程内存分配之间的竞争
不同大小的内存是如何进行分配的
ByteBuf
内存与内存管理器的抽象
不同规格大小和不同类别的内存的分配策略
内存的回收过程
ByteBuf结构以及重要API ByteBuf的结构 1 2 3 4 5 6 7 * +-------------------+------------------+------------------+ * | discardable bytes | readable bytes | writable bytes | 无效的 可以读的空间 可以写的空间 * +-------------------+------------------+------------------+ * | | | | * 0 <= readerIndex <= writerIndex <= capacity
ByteBuf中三个重要的指针
readIndex..表明读数据从这个指针开始
writeIndex..表明如果要写数据的话就从这个指针开始写
capacity
maxCapacity()
可扩容的最大容量,当capacity不够时就会进行扩容.要是大于maxCapacity()
就拒绝
read,write,set方法 提供了很多read开头的方法,如readInt();
readByte()
从readerIndex开始往后读1个字节
readInt();
从readerIndex开始往后读4个字节…
write的API也和read相同,从指针开始往后设置.
除了这两个之外还有set的API如setInt();
,它的功能是把第几个索引设置成目的的值
1 public abstract ByteBuf setByte (int index, int value) ;
它不会移动任何指针
mark和reset方法 1 2 public abstract ByteBuf markReaderIndex () ;public abstract ByteBuf resetReaderIndex () ;
通过read的API读取数据时,指针会向后移动.如果不想这样 可以在read之前先用mark记录指针位置,read完后用reset复原指针.markWriterIndex()
也同理
其他常用API 1 2 3 public abstract int readableBytes () ;public abstract int writableBytes () ;public abstract int maxWritableBytes () ;
ByteBuf分类
AbstractByteBuf 基本骨架的实现,读写的具体实现方法仍是抽象,这些抽象方法为_
开头
保存读写指针
1 2 3 4 5 int readerIndex;int writerIndex;private int markedReaderIndex;private int markedWriterIndex;private int maxCapacity;
他实现的readByte
1 2 3 4 5 6 7 8 @Override public byte readByte () { checkReadableBytes0(1 ); int i = readerIndex; byte b = _getByte(i); readerIndex = i + 1 ; return b; }
它实现的writeByte
1 2 3 4 5 6 7 8 9 @Override public ByteBuf writeByte (int value) { ensureAccessible(); ensureWritable0(1 ); _setByte(writerIndex++, value); return this ; }
ByteBuf实现的分类 Pooled和Unpooled Pooled:把预先在内存中设置好的内存封装为ByteBuf Unpooled:直接调用系统的API向操作系统申请一块内存
Unsafe和非Unsafe Unsafe:可以拿到对象的内存地址,通过内存地址进行读写操作.直接调用JDK的Unsafe对ByteBuf进行读写 非Unsafe:不依赖JDK的Unsafe对象 如PooledUnsafeHeapByteBuf#_getByte
1 2 3 4 5 6 7 8 9 10 11 @Override protected byte _getByte (int index) { return UnsafeByteBufUtil.getByte(memory, idx(index)); } --- static byte getByte (byte [] data, int index) { return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index); }
相反PooledHeapByteBuf#_getByte
1 2 3 4 5 6 7 8 9 @Override protected byte _getByte (int index) { return HeapByteBufUtil.getByte(memory, idx(index)); } --- static byte getByte (byte [] memory, int index) { return memory[index]; }
Heap和Direct Heap:在对上进行内存分配.GC管理 Direct:通过JDK的API在JVM外分配,没有GC UnpooledHeapByteBuf中
而UnpooledDirectByteBuf中
1 private ByteBuffer buffer;
它的_getByte()
1 2 3 4 @Override protected byte _getByte (int index) { return buffer.get(index); }
这个JDK的ByteBuffer是如何分配的?
在UnpooledDirectByteBuf的构造方法:
1 2 3 4 5 6 7 8 protected UnpooledDirectByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { this .alloc = alloc; setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); } --- public static ByteBuffer allocateDirect (int capacity) { return new DirectByteBuffer(capacity); }
ByteBufAllocator分析 ByteBufAllocator功能 所有类型的ByteBuf最终都是由ByteBufAllocator分配出来,它相当于Netty中的内存管理器.
看一下它有哪些功能
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public interface ByteBufAllocator { ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR; ByteBuf buffer () ; ByteBuf buffer (int initialCapacity) ; ByteBuf buffer (int initialCapacity, int maxCapacity) ; ByteBuf ioBuffer () ; ByteBuf ioBuffer (int initialCapacity) ; ByteBuf ioBuffer (int initialCapacity, int maxCapacity) ; ByteBuf heapBuffer () ; ByteBuf heapBuffer (int initialCapacity) ; ByteBuf heapBuffer (int initialCapacity, int maxCapacity) ; ByteBuf directBuffer () ; ByteBuf directBuffer (int initialCapacity) ; ByteBuf directBuffer (int initialCapacity, int maxCapacity) ; CompositeByteBuf compositeBuffer () ; CompositeByteBuf compositeBuffer (int maxNumComponents) ; CompositeByteBuf compositeHeapBuffer () ; CompositeByteBuf compositeHeapBuffer (int maxNumComponents) ; CompositeByteBuf compositeDirectBuffer () ; CompositeByteBuf compositeDirectBuffer (int maxNumComponents) ; boolean isDirectBufferPooled () ; int calculateNewCapacity (int minNewCapacity, int maxCapacity) ; }
可以看到ByteBufAllocator只区分了Direct或heap,没有说道其他的几种分类方式
AbstractByteBufAllocator抽象实现 如buffer()的实现
1 2 3 4 5 6 7 @Override public ByteBuf buffer () { if (directByDefault) { return directBuffer(); } return heapBuffer(); }
同时具备direct和heap分配的两种功能,下面是directBuffer()
1 2 3 4 5 6 7 8 9 10 11 @Override public ByteBuf directBuffer (int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0 ) { return emptyBuf; } validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity); } --- protected abstract ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) ; protected abstract ByteBuf newHeapBuffer (int initialCapacity, int maxCapacity) ;
也就是说具体new出来的DirectBuffer是pooled还是unpooled,是由子类实现.实现这个方法的类有
PooledByteBufAllocator
UnPooledByteBufAllocator
反之,AbstractByteBufAllocator已经实现了这两个方法之外的所有方法
ByteBufAllocator两大子类
回顾一下
Pooled:从已经分配好的内存中取一段
UnPlooed:直接调用系统API分配一块新的内存
那么unsafe和非unsafe是什么谁实现的
对于unsafe,netty会自动判别.如过能拿到unsafe对象就分配unsafe的byteBuf,如果拿不到则会分配非unsafe的byteBuf
比如UnpooledByteBufAllocator是这样实现newHeapBuffer的:
1 2 3 4 5 6 @Override protected ByteBuf newHeapBuffer (int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this , initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this , initialCapacity, maxCapacity); }
UnpooledByteBufAllocator分析 Heap内存分配 UnpooledByteBufAllocator#newHeapBuffer
如下
1 2 3 4 5 6 @Override protected ByteBuf newHeapBuffer (int initialCapacity, int maxCapacity) { return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this , initialCapacity, maxCapacity) : new UnpooledHeapByteBuf(this , initialCapacity, maxCapacity); }
关于UnpooledUnsafeHeapByteBuf的创建,可以看到UnpooledUnsafeHeapByteBuf
是UnpooledHeapByteBuf
的子类,而他的构造函数如下
1 2 3 UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (alloc, initialCapacity, maxCapacity); }
也就是说它的创建过程和UnpooledHeapByteBuf
是完全相同的,他们都需要去创建一个数组作为存储空间
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected UnpooledHeapByteBuf (ByteBufAllocator alloc, int initialCapacity,//初始的capacity, int maxCapacity) { this (alloc, new byte [initialCapacity], 0 , 0 , maxCapacity); } --- private UnpooledHeapByteBuf ( ByteBufAllocator alloc, byte [] initialArray, int readerIndex, int writerIndex, int maxCapacity) { super (maxCapacity); this .alloc = alloc; setArray(initialArray); setIndex(readerIndex, writerIndex); }
那么UnpooledUnsafeHeapByteBuf
和UnpooledHeapByteBuf
的实现的区别是什么?
关于UnpooledHeapByteBuf
,能看出它是直接从Byte数组中取出元素
(非Unsafe:不依赖JDK的Unsafe对象)
1 2 3 4 5 6 7 8 @Override protected byte _getByte (int index) { return HeapByteBufUtil.getByte(array, index); } --- static byte getByte (byte [] memory, int index) { return memory[index]; }
而UnpooledUnsafeHeapByteBuf
,它最终调用unsafe对象去获取元素
(Unsafe:可以拿到对象的内存地址,通过内存地址进行读写操作.直接调用JDK的Unsafe对ByteBuf进行读写)
1 2 3 4 5 6 7 8 @Override protected byte _getByte (int index) { return UnsafeByteBufUtil.getByte(array, index); } --- static byte getByte (byte [] data, int index) { return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index); }
direct内存分配 也就是堆外内存分配
回到UnpooledByteBufAllocator
的newDirectBuffer()
1 2 3 4 5 6 7 8 9 @Override protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { ByteBuf buf = PlatformDependent.hasUnsafe() ? UnsafeByteBufUtil.newUnsafeDirectByteBuf(this , initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this , initialCapacity, maxCapacity); return disableLeakDetector ? buf : toLeakAwareBuffer(buf); }
先看UnpooledDirectByteBuf
的创建
1 2 3 4 5 6 7 protected UnpooledDirectByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (maxCapacity); this .alloc = alloc; setByteBuffer(ByteBuffer.allocateDirect(initialCapacity)); }
而上面的newUnsafeDirectByteBuf
创建的unsafe的direct是做了什么
1 2 3 4 5 6 7 8 9 10 11 12 13 14 static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf ( ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity); } --- protected UnpooledUnsafeDirectByteBuf (ByteBufAllocator alloc, int initialCapacity, int maxCapacity) { super (maxCapacity); this .alloc = alloc; setByteBuffer(allocateDirect(initialCapacity), false ); }
与UnpooledDirectByteBuf的内存分配步骤相同,不同在于它的setByteBuffer()
步骤
1 2 3 4 5 6 7 8 final void setByteBuffer (ByteBuffer buffer, boolean tryFree) { this .buffer = buffer; memoryAddress = PlatformDependent.directBufferAddress(buffer); tmpNioBuf = null ; capacity = buffer.remaining(); }
这个PlatformDependent.directBufferAddress
会通过UNSAFE.getLong(object, fieldOffset);
获取内存地址
以后我们通过getByte读值的时候会通过这里保存的memoryAddress
去读值,看一下它的getByte
1 2 3 4 5 6 7 8 9 @Override protected byte _getByte (int index) { return UnsafeByteBufUtil.getByte(addr(index)); } --- static byte getByte (long address) { return UNSAFE.getByte(address); }
而相比之下,UnpooledDirectByteBuf
则是
1 2 3 4 @Override protected byte _getByte (int index) { return buffer.get(index); }
PooledByteBufAllocator概述 相对于Unpooled更复杂
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @Override protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null ) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
它的内存分配过程
1.拿到线程局部缓存PoolThreadCache newDirectBuffer()
可能会被多线程调用,而threadCache.get()
能使我们获取当前线程的cache 也就是说不同的线程拥有不同的内存空间 这里的threadCache是PoolThreadLocalCache
类型,基于JDK的ThreadLocal 它是PooledByteBufAllocator的内部类,调用threadCache.get();
时会调用它的initialValue()
方法创建一个PoolThreadCache
其中包含线程私有的空间,
1 2 3 4 5 6 7 8 9 10 11 @Override protected synchronized PoolThreadCache initialValue () { final PoolArena<byte []> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); }
2.在线程局部缓存的Arena上进行内存分配 1 2 3 4 5 6 7 8 9 @Override protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; buf = directArena.allocate(cache, initialCapacity, maxCapacity); }
在上面介绍的initialValue()
中是通过leastUsedArena()
拿到这的对象的 .从字面意思可以推断出他是从directArenas中拿到可用的Arena中最小的Arena.
但是heapArenas,directArenas是什么时候创建的?在PooledByteBufAllocator
的构造方法中创建 heapArena的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 if (nHeapArena > 0 ) { heapArenas = newArenaArray(nHeapArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length); for (int i = 0 ; i < heapArenas.length; i ++) { PoolArena.HeapArena arena = new PoolArena.HeapArena(this , pageSize, maxOrder, pageShifts, chunkSize); heapArenas[i] = arena; metrics.add(arena); } heapArenaMetrics = Collections.unmodifiableList(metrics); } --- private static <T> PoolArena<T>[] newArenaArray(int size) { return new PoolArena[size]; }
他是一个PoolArena类型的数组,directArena也类似
1 2 3 4 5 6 7 8 9 10 if (nDirectArena > 0 ) { directArenas = newArenaArray(nDirectArena); List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length); for (int i = 0 ; i < directArenas.length; i ++) { PoolArena.DirectArena arena = new PoolArena.DirectArena( this , pageSize, maxOrder, pageShifts, chunkSize); directArenas[i] = arena; metrics.add(arena); } directArenaMetrics = Collections.unmodifiableList(metrics);
也就是说在我们构造PooledByteBufAllocator
时会构造两种PoolArena数组.而这个数组的大小,也就是参数nHeapArena
,nDirectArena
的值是什么.它的重载构造函数是这样引入它的
1 2 3 4 5 6 public PooledByteBufAllocator (boolean preferDirect) { this (preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER); }
这两个常量的值的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final int defaultMinNumArena = runtime.availableProcessors() * 2 ;final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER; DEFAULT_NUM_HEAP_ARENA = Math.max(0 , SystemPropertyUtil.getInt( "io.netty.allocator.numHeapArenas" , (int ) Math.min( defaultMinNumArena, runtime.maxMemory() / defaultChunkSize / 2 / 3 ))); DEFAULT_NUM_DIRECT_ARENA = Math.max(0 , SystemPropertyUtil.getInt( "io.netty.allocator.numDirectArenas" , (int ) Math.min( defaultMinNumArena, PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3 )));
也就是说DEFAULT_NUM_HEAP_ARENA
和DEFAULT_NUM_DIRECT_ARENA
默认情况下就是cpu核数2,那么什么是cpu核数 2,之前创建NioEventLoop的时候同样也是创建的cpu核数*2,这么一来每个NioEventLoop都能有自己的Arena,而且使用它们时是不用加锁的
PooledByteBufAllocator结构
假设现在有4个线程,也就是说NioEventLoop有4个,下面的框是内存分配器PooledByteBufAllocator
拿到一个Arena放入Thread私有的PoolThreadCache中,保存为成员变量
通过threadCache.get().directArena
的时候就可以拿到属于线程自己的Arena
PooledThreadCache除了直接在Arena上分配内存之外,还可以在他底层维护的ByteBuf上的缓存列表上进行分配,如:
通过它创建了1024字节的ByteBuf,当用完并释放之后可能在其他地方用到这个1024字节的空间
此时不需要再Arena上进行分配,而是直接通过PooledThreadCache中维护的ByteBuf的缓存列表
PooledByteBufAllocator中的3个参数,表示了缓存的大小,分别表示3中不同大小的缓存量
tinyCacheSize tiny大小的ByteBuf最多能缓存多大
smallCacheSize
normalCacheSize
看一下这三个成员变量的定义,他们是在创建PoolThreadCache
时被放入的
1 2 3 4 5 6 7 8 9 10 @Override protected synchronized PoolThreadCache initialValue () { final PoolArena<byte []> heapArena = leastUsedArena(heapArenas); final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas); return new PoolThreadCache( heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize, DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL); }
在PoolThreadCache
的构造方法中:
1 2 3 4 5 6 7 8 9 10 11 12 if (directArena != null ) { tinySubPageDirectCaches = createSubPageCaches( tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny); smallSubPageDirectCaches = createSubPageCaches( smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small); numShiftsNormalDirect = log2(directArena.pageSize); normalDirectCaches = createNormalCaches( normalCacheSize, maxCachedBufferCapacity, directArena); directArena.numThreadCaches.getAndIncrement(); }
看一下createSubPageCaches
是怎么做的
1 2 3 4 5 6 7 8 9 10 11 12 13 private static <T> MemoryRegionCache<T>[] createSubPageCaches( int cacheSize, int numCaches, SizeClass sizeClass) { if (cacheSize > 0 ) { MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches]; for (int i = 0 ; i < cache.length; i++) { cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass); } return cache; } else { return null ; } }
directArena分配direct内存的流程 1.从对象池里面拿到PooledByteBuf
进行复用 继续看之前调用newDirectBuffer()
的过程
1 2 3 4 5 6 7 8 9 10 11 12 protected ByteBuf newDirectBuffer (int initialCapacity, int maxCapacity) { PoolThreadCache cache = threadCache.get(); PoolArena<ByteBuffer> directArena = cache.directArena; ByteBuf buf; if (directArena != null ) { buf = directArena.allocate(cache, initialCapacity, maxCapacity); } return toLeakAwareBuffer(buf); }
1 2 3 4 5 6 7 PooledByteBuf<T> allocate (PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; }
newByteBuf();
如何获取PooledByteBuf
对象?
1 2 3 4 5 6 7 8 @Override protected PooledByteBuf<ByteBuffer> newByteBuf (int maxCapacity) { if (HAS_UNSAFE) { return PooledUnsafeDirectByteBuf.newInstance(maxCapacity); } else { return PooledDirectByteBuf.newInstance(maxCapacity); } }
PooledUnsafeDirectByteBuf.newInstance()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static PooledUnsafeDirectByteBuf newInstance (int maxCapacity) { PooledUnsafeDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; } ---进行复用时,初始化 final void reuse (int maxCapacity) { maxCapacity(maxCapacity); setRefCnt(1 ); setIndex0(0 , 0 ); discardMarks(); } ---实际上就是把mark索引设置为0 final void discardMarks () { markedReaderIndex = markedWriterIndex = 0 ; }
2.从缓存上进行分配 1 2 3 4 5 6 PooledByteBuf<T> allocate (PoolThreadCache cache, int reqCapacity, int maxCapacity) { PooledByteBuf<T> buf = newByteBuf(maxCapacity); allocate(cache, buf, reqCapacity); return buf; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private void allocate (PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { final int normCapacity = normalizeCapacity(reqCapacity); if (normCapacity <= chunkSize) { if (cache.allocateNormal(this , buf, reqCapacity, normCapacity)) { return ; } allocateNormal(buf, reqCapacity, normCapacity); } else { allocateHuge(buf, reqCapacity); } }
上面只展示了normalCache
,但是tiny
和small
也是一样的逻辑,首先尝试在缓存中分配,分配失败就实际去分配一块内存.而当大于chunkSize时比较特殊,会直接就实际去分配内存.
3.缓存命中失败,从内存堆里面进行内存分配