农场主的黑科技.

Netty源码剖析-NioEventLoop创建

字数统计: 1.1k阅读时长: 7 min
2018/12/25 Share
  • 默认情况下,Netty服务端起多少线程?何时启动?
  • Netty是如何解决jdk空轮询bug的?
  • netty如何保证异步串行无锁化?

NioEventLoop

  • NioEventLoop创建
  • NioEventLoop启动
  • NioEventLoop执行逻辑

NioEventLoop创建

1
2
3
4
5
6
7
new NioEventLoopGroup()[线程数,默认2*cpu]
|
new ThreadPerTaskExecutor()[线程创建器]
|
for{newChlid()}[构造NioEventLoop]
|
chooserFactory.newChooser()[线程选择器]

跟踪new NioEventLoopGroup()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
    public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());//加入selector
}
---
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
//没传构造函数时,这里就是0.此时会默认cpu*2
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
---
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
//放入创建线程选择器的factory
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
---

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
//..
//创建线程选择器
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//构造NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
}
//线程选择器
chooser = chooserFactory.newChooser(children);
//..
}

ThreadPerTaskExcecutor

1
new ThreadPerTaskExecutor(newDefaultThreadFactory())

他会做两件事:为每一次执行任务都创建一个线程实体的工厂,定义NioEventLoop的名字.后期用于创建NioEventLoop线程

  • 每一次执行任务都会创建一个线程实体
    ThreadPerTaskExcecutor中包含一个execute方法,代码如下

    1
    2
    3
    4
    @Override
    public void execute(Runnable command) {
    threadFactory.newThread(command).start();//用参数factory每次都创建线程
    }
  • NioEventLoop线程命名规则nioEventLoop-1–xx
    追一下它的实现方式.首先ThreadPerTaskExecutor的参数的factory:

    1
    2
    3
    4
    5
    6
    7
    8
    protected ThreadFactory newDefaultThreadFactory() {
    return new DefaultThreadFactory(getClass()); //参数为NioEventLoop.class
    }
    ---跟
    public DefaultThreadFactory(Class<?> poolType, boolean daemon, int priority) {
    //toPoolName,把NioEventLoop转成"nioEventLoop"
    this(toPoolName(poolType), daemon, priority);
    }

    继续跟构造方法,到DefaultThreadFactory#DefaultThreadFactory就会看到

    1
    prefix = poolName + '-' + poolId.incrementAndGet() + '-';

    顺便看一下DefaultThreadFactory#newThread,就是之前ThreadPerTaskExecutor里面的那个.就会发现它里面创建了Thread,并且自定义了名字

    1
    Thread t = newThread(new DefaultRunnableDecorator(r), prefix + nextId.incrementAndGet());

newThread方法源码:

1
2
3
4
protected Thread newThread(Runnable r, String name) {
//也就是说,ThreadPerTaskExecutor实际上执行的是对java线程进行封装后的线程.而不是原生的
return new FastThreadLocalThread(threadGroup, r, name);
}

FastThreadLocalThread继承于Thread,对ThreadLocal做了些优化,并且自己包装了ThreadLocalMap.

newChilld()

接着看在NioEventLoop中做的第二件事:通过for{newChlid()}为每个处理线程(默认cpu*2)创建NioEventLoop.

它主要做这三件事:

  • 保存线程执行器ThreadPerTaskExecutor
  • 创建一个MpscQueue
  • 创建一个selector
1
2
newChild(executor, args) // 参数executor就是上面创建的ThreadPerTaskExecutor.
//这个方法会构造NioEventLoop

跟踪到NioEventLoop的构造方法

1
2
3
4
5
6
7
8
9
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//父类
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);

provider = selectorProvider;
selector = openSelector(); //对NioEventLoop绑定一个selector轮询器
selectStrategy = strategy;
}

openSelector()就是通过provider.openSelector();获取轮训器后保存

继续跟踪父类构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
//保存线程执行器,也就是ThreadPerTaskExecutor.
//每个NioEventLoop都会调用这个ThreadPerTaskExecutor创建执行线程
this.executor = ObjectUtil.checkNotNull(executor, "executor");
//taskQueue:外部线程执行netty任务的时候.不会直接让NioEventLoop执行,而是塞到队列里面让它执行
//创建taskQueue
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}

看一下这个newTaskQueue()(io.netty.channel.nio.NioEventLoop#newTaskQueue)

1
2
3
4
5
@Override
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
//是通过MpscQueue创建
return PlatformDependent.newMpscQueue(maxPendingTasks);
}

MPSC : multiProducer(指外部线程)-SingleConsumer (netty的处理线程)

chooserFactory.newChooser()

完成了NioEventLoop[]的创建.接着看在NioEventLoop中做的第三件事:通过chooserFactory.newChooser()构造线程选择器.

目的是给新连接绑定对应的NioEventLoop,通过io.netty.util.concurrent.MultithreadEventExecutorGroup#next绑定.

他是如何选择线程进行绑定的?很简单,当第一个新连接进来的时候绑定NioEventLoop[]中的第一个,第二个连接则绑定第二个,超过NioEventLoop[]大小后又回到第一个.但netty对此进行了优化:

1
2
3
4
5
6
7
8
9
isPowerOfTwo()[判断NioEventLoop[]的个数是否是2的幂,如2,4,8,16]
|
PowerOfTwoEventExecutorChooser[是2的幂:优化]
|
index++ & (length - 1)
|
GenericEventChooser[不是2的幂:普通]
|
abs(index++ % length)

看一下步骤:

1
2
3
4
5
6
7
8
9
10
11
12
chooserFactory.newChooser(children); //参数children就是刚才通过for创建的NioEventLoop的数组
---
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
//executors指NioEventLoop[]
if (isPowerOfTwo(executors.length)) { //判断2的幂
return new PowerOfTowEventExecutorChooser(executors);
} else {
//
return new GenericEventExecutorChooser(executors);
}
}

关于GenericEventExecutorChooser:

类里面有个next方法:DefaultEventExecutorChooserFactory.GenericEventExecutorChooser#next

就是单纯的executors[Math.abs(idx.getAndIncrement() % executors.length)];

关于PowerOfTowEventExecutorChooser:

类里面有个next方法:DefaultEventExecutorChooserFactory.PowerOfTowEventExecutorChooser#next

返回的是executors[idx.getAndIncrement() & executors.length - 1]

CATALOG
  1. 1. NioEventLoop
  2. 2. NioEventLoop创建
    1. 2.1. ThreadPerTaskExcecutor
    2. 2.2. newChilld()
    3. 2.3. chooserFactory.newChooser()