农场主的黑科技.

Netty源码剖析-NioEventLoop执行

字数统计: 3.6k阅读时长: 22 min
2018/12/28 Share

NioEventLoop执行

nioEventLoop在线程被首次创建的时候会通过run方法执行 (SingleThreadEventExecutor.this.run()).Netty基本组件那次的笔记里有涉及到.

NioEventLoop.run()

1
2
3
4
5
6
7
run() -> for(;;)
|
select()[检查是否有io事件]
|
processSelectKeys()[处理io事件]
|
runAllTask()[处理异步任务队列,就是上一节最后说的taskQueue]

实际代码io.netty.channel.nio.NioEventLoop#run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Override
protected void run() {
for (;;) {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));//轮询io事件
//一个nioEventLoop对应了一个selector,轮询注册到这个selector上的io事件
//..
}

//注意有两个processSelectedKeys();
//ioRatio :控制processSelectedKeys()和runAllTasks()的执行时间,默认50
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();//处理io事件
} finally {
runAllTasks();//处理外部线程放到taskQueue中的任务
}
} else {//默认执行这段
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
//参数为processSelectedKeys()花费的时间
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
//..
}

select()方法执行逻辑

  • deadline以及任务穿插逻辑处理

    • deadline:根据nioEventLoop是否有定时任务
    • 当前是否有异步任务

    满足上面任意一个.select()就会停止并退出

  • 阻塞式select,

    • 没有到deadLine并且任务队列为空的情况
    • 默认是1秒钟,外部线程也可以在这时把阻塞操作唤起
  • 避免jdk空轮询的bug

deadline以及任务穿插逻辑处理

1
2
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));//参数获取是否是唤醒状态,并之后把他设为false

先看一下它的源码(io.netty.channel.nio.NioEventLoop#select):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();//当前时间
//定时任务队列:按照任务的截止时间正序排序的队列,
//计算当前第一个定时任务的截止时间,当前的select()不能超过这个时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {//已经超时了
if (selectCnt == 0) {
selector.selectNow(); //非阻塞的select方法
selectCnt = 1;
}
break;//退出
}

//hasTasks(): 判断异步队列中是否有任务要执行
//实际在SingleThreadEventExecutor#hasTasks中判断taskQueue是否为空
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow(); //非阻塞的select方法
selectCnt = 1;
break;//退出
}

阻塞式select

对应run()中的

1
2
3
4
5
6
7
int selectedKeys = selector.select(timeoutMillis);//参数为本次可以进行select的最大时间,默认1秒
selectCnt ++; //表示当前已经轮询了selectCnt次
//轮询到了事件||是唤醒的状态||被唤醒了||异步队列有任务||定时任务队列有任务
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
//就是说有事干了.返回
break;
}

避免jdk空轮询的bug

jdk的空轮询的bug会导致cpu到100%,

继续看run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
long time = System.nanoTime();
//已经超过了截止时间(默认1),意思就是已经正常完成了一次select()但没有轮询到东西
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
selectCnt = 1;
//select()没有阻塞立即返回了,可能触发了空轮询
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
//这个值是512,也就是说512次轮询的结果都为空
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

rebuildSelector(); //调用这个避免下一次空轮询继续发生
//把老的selectedKeys都注册到一个新的selector里面去,替换当前的selector

selector = this.selector;
selector.selectNow(); //重新进行select,成功解决
selectCnt = 1;
break;
}

看一下避免下一次空轮询继续发生 的rebuildSelector();调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void rebuildSelector() {
final Selector oldSelector = selector;
final Selector newSelector;

newSelector = openSelector(); //创建新的selector

for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {//拿到旧的key
Object a = key.attachment();//之前讲过,是netty包装的channel

int interestOps = key.interestOps();//注册的事件
key.cancel();//取消旧的
//创建到新的上面
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
//attachment是netty包装的channel的话,加入key(之前说过)
((AbstractNioChannel) a).selectionKey = newKey;
}
}

processSelectKeys()

processSelectKeys()用于处理这些io事件

  • selected keySet优化
    select每次都会把已就绪io事件添加到底层的HashSet结构中.netty通过反射从这个HashSet中构建.而select的时间复杂度都是O(1)优于HashSet
  • processSelectedKeyOptimized()
    真正处理io事件

selected keySet优化

回到NioEventLoop的构造方法中,有这么一行操作

1
selector = openSelector();	//对NioEventLoop绑定一个selector轮询器

其实所有优化都是在这个方法中实现的。看一下源码(io.netty.channel.nio.NioEventLoop#openSelector

1
2
3
4
5
6
7
8
9
private Selector openSelector() {
final Selector selector = provider.openSelector(); //用jdk的api拿selector

if (DISABLE_KEYSET_OPTIMIZATION) { //若不需要优化,则直接返回。默认是false
return selector;
}

//稍后用这个替换selector中的selectedKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

看一下这个SelectedSelectionKeySet类型。
从名字上来看像是set,但实际是数组+size的方式去实现的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

private SelectionKey[] keysA;
private int keysASize;
//...
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
//添加到数组,复杂度O(1)
int size = keysASize;
keysA[size ++] = o;
keysASize = size;
if (size == keysA.length) {
doubleCapacityA();//有必要时进行数组扩容
}

return true;
}
//..不支持remove等操作,netty也不需要这些操作。所以他可以用数组来替换原来的hashset,优化复杂度
@Override
public boolean remove(Object o) {return false;}

@Override
public boolean contains(Object o) {return false;}

继续回到NioEventLoop的构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();//keySet优化

Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl", //反射这个类对象
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
return e;
} catch (SecurityException e) {
return e;
}
}
});

if (!(maybeSelectorImplClass instanceof Class) ||
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(selector.getClass())) {//不是jdk的selector

return selector;//返回这个selector
}

//是jdk的selector,替换成我们自己的selectedkeyset
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;

Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
//通过反射拿到它的两个属性,它们是hashset实现的
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);

//替换成netty自己实现的selector.今后的io事件都会存在这个keyset里
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;

if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
} else {
selectedKeys = selectedKeySet; //最后再把自定义的保存成这个类的成员变量
logger.trace("instrumented a special java.util.Set into: {}", selector);
}

总结一下就把jdk提供的selector中基于HashSet的selectedKeySet替换成基于数组的实现。

processSelectedKeyOptimized()

继续看NioEventLoop的run方法中的processSelectedKeys()部分。它的代码是

1
2
3
4
5
6
7
private void processSelectedKeys() {
if (selectedKeys != null) { //此时这里已经是优化后的selectedKeys
processSelectedKeysOptimized(selectedKeys.flip()); //把优化后的selectedKeys数组
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

看一下processSelectedKeysOptimized的源码

1
2
3
4
5
6
7
8
9
10
11
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null; //把slectedKeySet设为空
final Object a = k.attachment();//这里的atachment时netty自定义的channel

if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a); //实际对io事件进行处理

看一下这个processSelectedKey

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); //拿到unsafe
if (!k.isValid()) { //连接可能有问题
//..
unsafe.close(unsafe.voidPromise());//最终调用pipeline进行关闭
return;
}
//连接是合法的,处理io事件
int readyOps = k.readyOps();

if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //connect事件
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}

if ((readyOps & SelectionKey.OP_WRITE) != 0) { //write事件
ch.unsafe().forceFlush();
}

//read事件或accept事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}

runAllTask()

run()中的以后一步.这一步会处理异步任务队列,就是上一节最后说的taskQueue

  • task的分类和添加
    普通的task和定时的task.通过两个方法进行task的添加
  • 任务的聚合
    把定时任务的task聚合到普通的taskQueue里面
  • 任务的执行

task的分类和添加

netty中定义了两个任务队列,

  • 普通任务队列
  • 定时任务队列

普通任务队列 mpscQueue(创建nioeventLoop时创建的)

外部线程调用NioEventLoop的execute(SingleThreadEventExecutor#execute)时候

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}

boolean inEventLoop = inEventLoop();
if (inEventLoop) { //是否是在当前线程中调用
addTask(task);
} else { //是外部线程调用
startThread(); //启动线程,也就是说nioEventLoop首次调用execute的时候启动线程
addTask(task); //把task添加到taskQueue
}

顺便看一下addTask()

1
2
3
4
5
6
7
8
9
10
protected void addTask(Runnable task) {
if (!offerTask(task)) { //调用offerTask
reject(task);
}
}
---
final boolean offerTask(Runnable task) {

return taskQueue.offer(task); //给queue添加一个元素
}

定时任务队列

外部线程调用NioEventLoop的schedule(AbstractScheduledEventExecutor#schedule)时候

1
2
3
4
5
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return schedule(new ScheduledFutureTask<V>( //把任务包装成netty的task
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}

看一下这个schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) { //是否是当前的nioEventLoop发起的schedule
scheduledTaskQueue().add(task);
} else { //外部线程发布的schedule
execute(new Runnable() { //保证线程安全,因为这个scheduledTaskQueue不是线程安全的,它基于priorityQueue
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}

return task;
}

任务的聚合

回run()看一下runAllTasks();执行前后的部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);//把处理io花费的时间传进来,执行任务的时候不能超过这个参数时间??
}
}

去看runAllTasks的代码:

1
2
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue(); //从定时任务队列中聚合任务

看聚合任务的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private boolean fetchFromScheduledTaskQueue() {
//抓取第一个定时任务
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);//取已截止的任务
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) { //放入普通任务队列
//普通任务队列添加失败,重新添加到定时任务队列中
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);//继续取已截止的任务
}
//此时已截至的定时任务已经合并到普通任务队列中了
return true;
}

看一下pollScheduledTask:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();

//从按任务截止时间的队列中拿出第一个数据
//这个scheduledTaskQueue是基于优先队列实现的
//而ScheduledFutureTask类重载了compareTo,让他们比较截止时间,所以截止时间越小越靠前排列
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null; //任务队列为空
}

if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove(); //存在已截止的任务,从队列移除后返回
return scheduledTask;
}
return null; //所有任务都没到截止时间
}

任务的执行

继续看runAllTask()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue(); //上一步,任务的聚合
Runnable task = pollTask(); //此时该执行的所有任务都在普通的taskQueue中.从这取任务
if (task == null) {
afterRunningAllTasks();//执行完所有任务时的收尾操作
return false;
}

//计算截止时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task); //挨个执行每个任务,这个safeExecute只是单纯调用run方法

runTasks ++;

if ((runTasks & 0x3F) == 0) {//执行完64个任务的时候,计算一下超时时间,如果超过了就不执行了
lastExecutionTime = ScheduledFutureTask.nanoTime();//因为这个操作比较耗时,所以不会每次都检查
if (lastExecutionTime >= deadline) {
break;
}
}

task = pollTask();//继续从普通的taskQueue中取任务
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}

afterRunningAllTasks(); //一个收尾的操作
this.lastExecutionTime = lastExecutionTime;
return true;
}

几个问题

  • 默认情况下,Netty服务端起多少线程?何时启动?
    默认cpu*2,调用execute方法的时候判断当前是否是在本线程,如果是说明已经启动.如果是在外部线程调用,此时会启动线程
  • Netty是如何解决jdk空轮询bug的?
    用计数的方式去判断.如果当前阻塞的操作实际上没有花费时间,那么有可能触发了空轮询的bug.如果持续了512次,那么就判断出发了bug,新建一个selector把,把之前的key复制到这个新的selector.
  • netty如何保证异步串行无锁化?
    netty在外部线程调用execute时,通过inEventLoop()判定是否是外部线程.此时将所有的操作封装成一个task,丢到mpsc队列中,随后这些task会被挨个执行

总结

new NioEventGroup被调用时会创建:

  1. threadPerTaskExcecutor 线程工厂,用于后期为每个nioEventLoop分配线程.
  2. 创建cpu*2个nioEventLoop,此时他们的构造方法会各自进行以下操作:

    • 保存netty进行优化后的轮训器
    • 创建一个普通任务队列(mpsc):外部线程希望执行任务时,会把任务放入这个队列中
    • 保存刚才创建的线程工厂threadPerTaskExcecutor
  3. 为nioEventGroup创建nioEventLoop[]的选择器,就是单纯从前面开始选,到尾后又回到第一个nioEventLoop

在NioEventLoop的execute()方法首次被执行时,也就是首次被外部线程调用execute()方法时,各个NioEventLoop会用之前的线程工厂创建属于这个NioEventLoop的线程.此时NioEventLoop的run()方法会被执行
run()方法是一个for(;;)循环,他会干下面这些事情:

  1. 进行阻塞式的轮询,以下在一个for(;;)中执行.也就是说在有事干之前一直进行轮询
    • 如果有到了定时任务执行的时间,退出阻塞式轮询去执行定时任务
    • 如果普通任务队列中有需要执行的任务,退出阻塞式轮询去执行普通方法中的任务
    • 进行一秒阻塞式轮询,若阻塞到了任务就退出.
      此时任务已被轮训器保存到优化过后的selectedKeySet中了
    • 对jdk的空轮询bug进行处理
  2. 处理轮训器阻塞到的io事件
    此时基于nioEventLoop创建时优化过的selectedKeySet.对这个集合中的所有io事件进行处理.

  3. 处理任务队列
    普通任务队列,定时任务队列

    • 普通任务是什么时候被添加到普通任务队列的?
      NioEventLoop的execute()方法被外部线程执行时,此时若NioEventLoop线程还未启动,会启动它.
    • 定时任务是什么时候被添加到定时任务队列的?
      NioEventLoop的schedule()方法被外部线程执行时,他基于PriorityQueue,按截止时间排列
    1. 从定时任务队列中获取所有已截止的任务,合并到普通任务列表中
    2. 在当前的nioEventLoop的线程中依次调用普通任务队列中各个任务的run方法,直到执行完所有任务.
CATALOG
  1. 1. NioEventLoop执行
    1. 1.1. NioEventLoop.run()
    2. 1.2. select()方法执行逻辑
    3. 1.3. processSelectKeys()
      1. 1.3.1. selected keySet优化
      2. 1.3.2. processSelectedKeyOptimized()
    4. 1.4. runAllTask()
      1. 1.4.1. task的分类和添加
      2. 1.4.2. 任务的聚合
      3. 1.4.3. 任务的执行
  2. 2. 几个问题
  3. 3. 总结