NioEventLoop执行 nioEventLoop在线程被首次创建的时候会通过run方法执行 (SingleThreadEventExecutor.this.run()
).Netty基本组件那次的笔记里有涉及到.
NioEventLoop.run() 1 2 3 4 5 6 7 run() -> for(;;) | select()[检查是否有io事件] | processSelectKeys()[处理io事件] | runAllTask()[处理异步任务队列,就是上一节最后说的taskQueue]
实际代码io.netty.channel.nio.NioEventLoop#run
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Override protected void run () { for (;;) { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue ; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false )); } final int ioRatio = this .ioRatio; if (ioRatio == 100 ) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } }
select()方法执行逻辑
deadline以及任务穿插逻辑处理
deadline:根据nioEventLoop是否有定时任务
当前是否有异步任务
满足上面任意一个.select()就会停止并退出
阻塞式select,
没有到deadLine并且任务队列为空的情况
默认是1秒钟,外部线程也可以在这时把阻塞操作唤起
避免jdk空轮询的bug
deadline以及任务穿插逻辑处理
1 2 case SelectStrategy.SELECT: select(wakenUp.getAndSet(false ));
先看一下它的源码(io.netty.channel.nio.NioEventLoop#select
):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 private void select (boolean oldWakenUp) throws IOException { Selector selector = this .selector; try { int selectCnt = 0 ; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L ) / 1000000L ; if (timeoutMillis <= 0 ) { if (selectCnt == 0 ) { selector.selectNow(); selectCnt = 1 ; } break ; } if (hasTasks() && wakenUp.compareAndSet(false , true )) { selector.selectNow(); selectCnt = 1 ; break ; }
阻塞式select
对应run()中的
1 2 3 4 5 6 7 int selectedKeys = selector.select(timeoutMillis);selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break ; }
避免jdk空轮询的bug
jdk的空轮询的bug会导致cpu到100%,
继续看run()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1 ; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { rebuildSelector(); selector = this .selector; selector.selectNow(); selectCnt = 1 ; break ; }
看一下避免下一次空轮询继续发生 的rebuildSelector();
调用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void rebuildSelector () { final Selector oldSelector = selector; final Selector newSelector; newSelector = openSelector(); for (;;) { try { for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelector, interestOps, a); if (a instanceof AbstractNioChannel) { ((AbstractNioChannel) a).selectionKey = newKey; } }
processSelectKeys() processSelectKeys()用于处理这些io事件
selected keySet优化 select每次都会把已就绪io事件添加到底层的HashSet结构中.netty通过反射从这个HashSet中构建.而select的时间复杂度都是O(1)优于HashSet
processSelectedKeyOptimized() 真正处理io事件
selected keySet优化 回到NioEventLoop的构造方法中,有这么一行操作
1 selector = openSelector();
其实所有优化都是在这个方法中实现的。看一下源码(io.netty.channel.nio.NioEventLoop#openSelector
)
1 2 3 4 5 6 7 8 9 private Selector openSelector () { final Selector selector = provider.openSelector(); if (DISABLE_KEYSET_OPTIMIZATION) { return selector; } final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
看一下这个SelectedSelectionKeySet
类型。 从名字上来看像是set,但实际是数组+size的方式去实现的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 final class SelectedSelectionKeySet extends AbstractSet <SelectionKey > { private SelectionKey[] keysA; private int keysASize; @Override public boolean add (SelectionKey o) { if (o == null ) { return false ; } int size = keysASize; keysA[size ++] = o; keysASize = size; if (size == keysA.length) { doubleCapacityA(); } return true ; } @Override public boolean remove (Object o) {return false ;} @Override public boolean contains (Object o) {return false ;}
继续回到NioEventLoop
的构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run () { try { return Class.forName( "sun.nio.ch.SelectorImpl" , false , PlatformDependent.getSystemClassLoader()); } catch (ClassNotFoundException e) { return e; } catch (SecurityException e) { return e; } } }); if (!(maybeSelectorImplClass instanceof Class) || !((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) { return selector; } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run () { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys" ); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys" ); selectedKeysField.setAccessible(true ); publicSelectedKeysField.setAccessible(true ); selectedKeysField.set(selector, selectedKeySet); publicSelectedKeysField.set(selector, selectedKeySet); return null ; if (maybeException instanceof Exception) { selectedKeys = null ; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}" , selector, e); } else { selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}" , selector); }
总结一下就把jdk提供的selector中基于HashSet的selectedKeySet替换成基于数组的实现。
processSelectedKeyOptimized() 继续看NioEventLoop的run方法中的processSelectedKeys()
部分。它的代码是
1 2 3 4 5 6 7 private void processSelectedKeys () { if (selectedKeys != null ) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
看一下processSelectedKeysOptimized
的源码
1 2 3 4 5 6 7 8 9 10 11 private void processSelectedKeysOptimized (SelectionKey[] selectedKeys) { for (int i = 0 ;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null ) { break ; } selectedKeys[i] = null ; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a);
看一下这个processSelectedKey
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void processSelectedKey (SelectionKey k, AbstractNioChannel ch) { final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { unsafe.close(unsafe.voidPromise()); return ; } int readyOps = k.readyOps(); if ((readyOps & SelectionKey.OP_CONNECT) != 0 ) { int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & SelectionKey.OP_WRITE) != 0 ) { ch.unsafe().forceFlush(); } if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 ) { unsafe.read(); if (!ch.isOpen()) { return ; } }
runAllTask() run()中的以后一步.这一步会处理异步任务队列,就是上一节最后说的taskQueue
task的分类和添加 普通的task和定时的task.通过两个方法进行task的添加
任务的聚合 把定时任务的task聚合到普通的taskQueue里面
任务的执行
task的分类和添加 netty中定义了两个任务队列,
普通任务队列 mpscQueue(创建nioeventLoop时创建的)
外部线程调用NioEventLoop的execute(SingleThreadEventExecutor#execute
)时候
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public void execute (Runnable task) { if (task == null ) { throw new NullPointerException("task" ); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); }
顺便看一下addTask()
1 2 3 4 5 6 7 8 9 10 protected void addTask (Runnable task) { if (!offerTask(task)) { reject(task); } } --- final boolean offerTask (Runnable task) { return taskQueue.offer(task); }
定时任务队列
外部线程调用NioEventLoop的schedule(AbstractScheduledEventExecutor#schedule
)时候
1 2 3 4 5 @Override public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) { return schedule(new ScheduledFutureTask<V>( this , callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); }
看一下这个schedule
1 2 3 4 5 6 7 8 9 10 11 12 13 14 <V> ScheduledFuture<V> schedule (final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run () { scheduledTaskQueue().add(task); } }); } return task; }
任务的聚合 回run()看一下runAllTasks();
执行前后的部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final int ioRatio = this .ioRatio;if (ioRatio == 100 ) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } }
去看runAllTasks
的代码:
1 2 protected boolean runAllTasks (long timeoutNanos) { fetchFromScheduledTaskQueue();
看聚合任务的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private boolean fetchFromScheduledTaskQueue () { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null ) { if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false ; } scheduledTask = pollScheduledTask(nanoTime); } return true ; }
看一下pollScheduledTask
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 protected final Runnable pollScheduledTask (long nanoTime) { assert inEventLoop () ; Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this .scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null ) { return null ; } if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } return null ; }
任务的执行 继续看runAllTask()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 protected boolean runAllTasks (long timeoutNanos) { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null ) { afterRunningAllTasks(); return false ; } final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0 ; long lastExecutionTime; for (;;) { safeExecute(task); runTasks ++; if ((runTasks & 0x3F ) == 0 ) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break ; } } task = pollTask(); if (task == null ) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break ; } } afterRunningAllTasks(); this .lastExecutionTime = lastExecutionTime; return true ; }
几个问题
默认情况下,Netty服务端起多少线程?何时启动? 默认cpu*2,调用execute方法的时候判断当前是否是在本线程,如果是说明已经启动.如果是在外部线程调用,此时会启动线程
Netty是如何解决jdk空轮询bug的? 用计数的方式去判断.如果当前阻塞的操作实际上没有花费时间,那么有可能触发了空轮询的bug.如果持续了512次,那么就判断出发了bug,新建一个selector把,把之前的key复制到这个新的selector.
netty如何保证异步串行无锁化? netty在外部线程调用execute时,通过inEventLoop()判定是否是外部线程.此时将所有的操作封装成一个task,丢到mpsc队列中,随后这些task会被挨个执行
总结 new NioEventGroup被调用时会创建:
threadPerTaskExcecutor 线程工厂,用于后期为每个nioEventLoop分配线程.
创建cpu*2个nioEventLoop,此时他们的构造方法会各自进行以下操作:
保存netty进行优化后的轮训器
创建一个普通任务队列(mpsc):外部线程希望执行任务时,会把任务放入这个队列中
保存刚才创建的线程工厂threadPerTaskExcecutor
为nioEventGroup创建nioEventLoop[]的选择器,就是单纯从前面开始选,到尾后又回到第一个nioEventLoop
在NioEventLoop的execute()方法首次被执行时,也就是首次被外部线程调用execute()方法时,各个NioEventLoop会用之前的线程工厂创建属于这个NioEventLoop的线程.此时NioEventLoop的run()方法会被执行 run()方法是一个for(;;)循环,他会干下面这些事情:
进行阻塞式的轮询,以下在一个for(;;)中执行.也就是说在有事干之前一直进行轮询
如果有到了定时任务执行的时间,退出阻塞式轮询去执行定时任务
如果普通任务队列中有需要执行的任务,退出阻塞式轮询去执行普通方法中的任务
进行一秒阻塞式轮询,若阻塞到了任务就退出. 此时任务已被轮训器保存到优化过后的selectedKeySet中了
对jdk的空轮询bug进行处理
处理轮训器阻塞到的io事件 此时基于nioEventLoop创建时优化过的selectedKeySet.对这个集合中的所有io事件进行处理.
处理任务队列 普通任务队列,定时任务队列
普通任务是什么时候被添加到普通任务队列的? NioEventLoop的execute()方法被外部线程执行时,此时若NioEventLoop线程还未启动,会启动它.
定时任务是什么时候被添加到定时任务队列的? NioEventLoop的schedule()方法被外部线程执行时,他基于PriorityQueue,按截止时间排列
从定时任务队列中获取所有已截止的任务,合并到普通任务列表中
在当前的nioEventLoop的线程中依次调用普通任务队列中各个任务的run方法,直到执行完所有任务.