轻量级对象池Recycler 如果recycler中有对象就能复用,不用每次都去new.
基于FastThreadLocal
好处:
Recycler使用 示例程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public class RecycleTest { private static final Recycler<User> RECYCLER = new Recycler<User>() { @Override protected User newObject (Handle<User> handle) { return new User(handle); } }; private static class User { private final Recycler.Handle<User> handle; public User (Recycler.Handle<User> handle) { this .handle = handle; } public void recycle () { handle.recycle(this ); } } public static void main (String[] args) { User user = RECYCLER.get(); user.recycle(); RECYCLER.get().recycle(); User user1 = RECYCLER.get(); System.out.println(user1 == user); } }
我们之前在学习ByteBuf的时候,就接触过这个Recycler
类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final class PooledDirectByteBuf extends PooledByteBuf <ByteBuffer > { private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() { @Override protected PooledDirectByteBuf newObject (Handle<PooledDirectByteBuf> handle) { return new PooledDirectByteBuf(handle, 0 ); } }; static PooledDirectByteBuf newInstance (int maxCapacity) { PooledDirectByteBuf buf = RECYCLER.get(); buf.reuse(maxCapacity); return buf; }
Recycler创建 看io.netty.util.Recycler
的构造函数:
1 2 3 protected Recycler () { this (DEFAULT_MAX_CAPACITY_PER_THREAD); }
这个DEFAULT_MAX_CAPACITY_PER_THREAD
是什么,稍后会讨论. 注意到Recycler
中有一个FastThreadLocal
的变量:
1 2 3 4 5 6 7 8 9 10 11 12 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue () { return new Stack<T>(Recycler.this , Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } };
也就是说这个FastThreadLocal
,对于每个线程都维护一个io.netty.util.Recycler.Stack
对象.
也就是说一个Recycler
会通过一个FastThreadLocal
保存该线程私有的对象池
我们看一下这个线程私有的Stack
里面的变量默认值:假设当前的线程是Thread1
thread
: 当前线程
ratioMask
:对象回收的频率
maxCapacity
:Stack能存的最大大小,存多少个handle
maxDelayedQueues
:当前线程创建的对象能释放的线程数有多少 如:值 = 1时,Thread1只能在Thread2中释放,释放到Thread3就直接扔掉了
head,pre,cursor
.指向Thread2的那个数组的指针
availableSharedCapacity
:Thread1创建的对象能在其他线程里缓存的最大个数
那么些Stack
的变量的默认值是什么,根据分析,传入的初始值如下注释所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() { @Override protected Stack<T> initialValue () { return new Stack<T>(Recycler.this , Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor, ratioMask, maxDelayedQueuesPerThread); } };
也就是说当前线程能存32K个,能在别的线程存16K个handler
从Recycler获取对象
获取当前线程的Stack
从Stack里面弹出对象,如果弹出的对象不为空则直接返回
如果弹出的对象为空,创建对象并绑定到Stack
从示例程序的RECYCLER.get()
的步骤开始分析:
1 2 3 4 5 6 7 8 9 10 11 12 13 public final T get () { if (maxCapacityPerThread == 0 ) { return newObject((Handle<T>) NOOP_HANDLE); } Stack<T> stack = threadLocal.get(); DefaultHandle<T> handle = stack.pop(); if (handle == null ) { handle = stack.newHandle(); handle.value = newObject(handle); } return (T) handle.value; }
分析创建一个handler的handle = stack.newHandle();步骤
:
1 2 3 4 DefaultHandle<T> newHandle () { return new DefaultHandle<T>(this ); }
因此客户端可以调用handle.recycle()
让管理它的Stack回收
再看一下它从stack中pop()
的过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 DefaultHandle<T> pop () { int size = this .size; if (size == 0 ) { if (!scavenge()) { return null ; } size = this .size; } size --; DefaultHandle ret = elements[size]; elements[size] = null ; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times" ); } ret.recycleId = 0 ; ret.lastRecycledId = 0 ; this .size = size; return ret; }
回收对象到Recycler 需要考虑两方面:
看示例程序的user.recycle();
也就是handle.recycle(this)
方法:
1 2 3 4 5 6 7 public void recycle (Object object) { if (object != value) { throw new IllegalArgumentException("object does not belong to handle" ); } stack.push(this ); }
stack.push(this);
的细节:
1 2 3 4 5 6 7 8 9 10 11 void push (DefaultHandle<?> item) { Thread currentThread = Thread.currentThread(); if (thread == currentThread) { pushNow(item); } else { pushLater(item, currentThread); } }
同线程回收对象 先分析创建stack时的线程进行回收时调用的pushNow(item);
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private void pushNow (DefaultHandle<?> item) { if ((item.recycleId | item.lastRecycledId) != 0 ) { throw new IllegalStateException("recycled already" ); } item.recycleId = item.lastRecycledId = OWN_THREAD_ID; int size = this .size; if (size >= maxCapacity || dropHandle(item)) { return ; } if (size == elements.length) { elements = Arrays.copyOf(elements, min(size << 1 , maxCapacity)); } elements[size] = item; this .size = size + 1 ; }
异线程回收对象 分析线程试图回收非自己创建的对象时调用的pushLater(item, currentThread);
步骤:
获取WeakOrderQueue
,用于回收在异线程创建的对象
首次获取不到WeakOrderQueue
,创建WeakOrderQueue
把Thread1
的Stack和Tread2
的WeakOrderQueue
进行绑定
将对象追加到WeakOrderQueue
下面开始跟踪源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 private void pushLater (DefaultHandle<?> item, Thread thread) { Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get(); WeakOrderQueue queue = delayedRecycled.get(this ); if (queue == null ) { if (delayedRecycled.size() >= maxDelayedQueues) { delayedRecycled.put(this , WeakOrderQueue.DUMMY); return ; } if ((queue = WeakOrderQueue.allocate(this , thread)) == null ) { return ; } delayedRecycled.put(this , queue); } else if (queue == WeakOrderQueue.DUMMY) { return ; } queue.add(item); }
看一下创建的WeakOrderQueue
的部分:
1 2 3 4 5 6 7 static WeakOrderQueue allocate (Stack<?> stack, Thread thread) { return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY) ? new WeakOrderQueue(stack, thread) : null ; }
允许则通过new WeakOrderQueue(stack, thread)
创建
先看一下WeakOrderQueue
的结构
每个Link都包括Handler数组
head指向第一个link,tail指向最后一个link
next指向下一个WeakOrderQueue,也就是其他线程的WeakOrderQueue
一个link的的大小是16,也就是说Handler数组的长度为16.目的是减少管理的次数
它的构造函数:
1 2 3 4 5 6 7 8 9 10 11 private WeakOrderQueue (Stack<?> stack, Thread thread) { head = tail = new Link(); owner = new WeakReference<Thread>(thread); synchronized (stack) { next = stack.head; stack.head = this ; } io.netty.util.Recycler.Stack#pushLater availableSharedCapacity = stack.availableSharedCapacity; }
第三步是调用当前线程的WeakOrderQueue
的add(handler)
去添加handler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void add (DefaultHandle<?> handle) { handle.lastRecycledId = id; Link tail = this .tail; int writeIndex; if ((writeIndex = tail.get()) == LINK_CAPACITY) { if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) { return ; } this .tail = tail = tail.next = new Link(); writeIndex = tail.get(); } tail.elements[writeIndex] = handle; handle.stack = null ; tail.lazySet(writeIndex + 1 ); }
从Recycler获取对象 在前面的小节中,我们提到了从Recycler获取对象时会从stack
pop出一个对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 DefaultHandle<T> pop () { int size = this .size; if (size == 0 ) { if (!scavenge()) { return null ; } size = this .size; } size --; DefaultHandle ret = elements[size]; elements[size] = null ; if (ret.lastRecycledId != ret.recycleId) { throw new IllegalStateException("recycled multiple times" ); } ret.recycleId = 0 ; ret.lastRecycledId = 0 ; this .size = size; return ret; }
其中讲到当本线程的stack为空时,如果有释放到其他线程的handle,就跑去其他线程把这些对象给捞回来
分析一下这一步:
head指向首次放入到其他线程时创建的WeakOrderQueue
cursor指针指向当前想获取的位置
pre是指向cursor的前一个节点
它是通过scavenge()
来完成从WeakOrderQueue
捞回的:
1 2 3 4 5 6 7 8 9 10 boolean scavenge () { if (scavengeSome()) { return true ; } prev = null ; cursor = head; return false ; }
它会调用scavengeSome()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 boolean scavengeSome () { WeakOrderQueue cursor = this .cursor; if (cursor == null ) { cursor = head; if (cursor == null ) { return false ; } } boolean success = false ; WeakOrderQueue prev = this .prev; do { if (cursor.transfer(this )) { success = true ; break ; } WeakOrderQueue next = cursor.next; if (cursor.owner.get() == null ) { if (cursor.hasFinalData()) { for (;;) { if (cursor.transfer(this )) { success = true ; } else { break ; } } } if (prev != null ) { prev.next = next; ` } } else { prev = cursor; } cursor = next; } while (cursor != null && !success); this .prev = prev; this .cursor = cursor; return success; }
分析把WeakOrderQueue中的一个link传输到当前stack时执行的cursor.transfer(this)
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 boolean transfer (Stack<?> dst) { Link head = this .head; if (head == null ) { return false ; } if (head.readIndex == LINK_CAPACITY) { if (head.next == null ) { return false ; } this .head = head = head.next; } final int srcStart = head.readIndex; int srcEnd = head.get(); final int srcSize = srcEnd - srcStart; if (srcSize == 0 ) { return false ; } final int dstSize = dst.size; final int expectedCapacity = dstSize + srcSize; if (expectedCapacity > dst.elements.length) { final int actualCapacity = dst.increaseCapacity(expectedCapacity); srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd); } if (srcStart != srcEnd) { final DefaultHandle[] srcElems = head.elements; final DefaultHandle[] dstElems = dst.elements; int newDstSize = dstSize; for (int i = srcStart; i < srcEnd; i++) { DefaultHandle element = srcElems[i]; if (element.recycleId == 0 ) { element.recycleId = element.lastRecycledId; } else if (element.recycleId != element.lastRecycledId) { throw new IllegalStateException("recycled already" ); } srcElems[i] = null ; if (dst.dropHandle(element)) { continue ; } element.stack = dst; dstElems[newDstSize ++] = element; } if (srcEnd == LINK_CAPACITY && head.next != null ) { reclaimSpace(LINK_CAPACITY); this .head = head.next; } head.readIndex = srcEnd; if (dst.size == newDstSize) { return false ; } dst.size = newDstSize; return true ; } else { return false ; } }
小结 netty提供了一个轻量级对象池Recycler.如果recycler中有对象就能复用,不用每次都去new.
一个Recycler它会包括
FastThreadLocal<Stack>
用于存储当前线程分配并回收的对象
FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>
用于存储其他线程创建,当前线程回收的对象<K,V> = <Thread1的Stack,在当前线程里回收到的Thread1分配的对象>
<K,V> = <Thread2的Stack,在当前线程里回收到的Thread2分配的对象>
获取:
当前线程Stack不是空时,返回里面的已回收对象
当前Stack为空,可能释放到其他线程了,跑去其他线程从他的WeakOrderQueue里把这些对象给捞回来
回收:
回收当前线程自己创建的对象,直接放入stack
回收别的线程创建的对象,放入属于那个线程的WeakOrderQueue