netty对JDK的NIO进行一系列的封装,下面总结netty中的一些概念,不涉及源码,源码解析部分在之前的博客中有记录.
常见问题
- Boss线程
就是服务端线程,用于管理ACCEPT事件,当发生ACCEPT事件时获取新连接的Channel在pipeline中进行传播 - worker线程
用于处理客户端,数量是默认的CPU*2个
创建服务端Channel(Boss)时会把用于客户端的数据封装成ServerBootstrapAcceptor,其中也包括CPU×2个worker的NioEventLoop信息,他会被加入服务端Channel的pipeline.
当服务端轮询到accept事件时就会在服务端pipeline中进行传播,当传播到这个ServerBootstrapAcceptor的时候,ServerBootstrapAcceptor会把自己包含的客户端信息赋予给新连接的Channel,并把这个新的客户端Channel注册到某个WorkerSelector上,用于轮询READ事件.
服务端启动
- 服务端的socket在哪里初始化?
服务端启动时,也就是ServerBootstrap.bind()被用户代码调用时,通过反射构建NioServerSocketChannel作为服务端的SocketChannel - 在哪里accept连接?
把服务端Channel注册到Boss线程的Selector,并把感兴趣的事件设置为OP_ACCEPT,通过Select轮询accept连接
NioEventLoop
- 默认情况下,Netty服务端起多少线程?何时启动?
默认cpu*2,调用execute方法的时候判断当前是否是在本线程,如果是说明已经启动.如果是在外部线程调用,此时会启动线程 - Netty是如何解决jdk空轮询bug的?
用计数的方式去判断.如果当前阻塞的操作实际上没有花费时间,那么有可能触发了空轮询的bug.如果持续了512次,那么就判断出发了bug,新建一个selector把,把之前的key复制到这个新的selector. - netty如何保证异步串行无锁化?
netty在外部线程调用execute时,通过inEventLoop()判定是否是外部线程.此时将所有的操作封装成一个task,丢到mpsc队列中,随后这些task会被挨个执行
新连接
- Netty是在哪里检测到有新连接接入的?
Boss线程的selector轮训ACCEPT事件 - 新连接是怎样注册到NioEventLoop线程的?
Boss线程轮询到ACCEPT事件后在pipeline中进行传播,pipeline中包含一条ServerBootstrapAcceptor,里面包含了CPU×2个worker的信息还有服务端Channel的信息.
把服务端Channel的信息放入到新连接,并通过选择器拿到一个worker,把新连接作为READ事件注册到这个worker的selector
pipeline
netty如何判断ChannelHandler类型的?
添加一个节点时,pipeline会通过一个instanceof来判断是inbound类型还是outbound类型,然后用布尔变量来标示对于ChannelHandler的添加应该遵循什么样的顺序?
inBound事件的传播顺序和inBoundHandler的添加顺序相同
outBound事件的传播顺序和outBoundHandler的添加顺序相反用户手动触发事件传播,不同的触发方式有什么样的区别?
inbound事件,如
1
2ctx.fireChannelRead(msg); //从当前节点继续往下传播
ctx.channel().pipeline().fireChannelRead("hello world");//从head节点开始往下传播outbound事件,如
1
2ctx.write(msg, promise); //从当前节点开始往下传播
ctx.channel().write("hello world"); //从tail开始传播
ByteBuf
内存的类别有哪些
三个维度
- 按内存的分配方式分类(Pooled,UnPooled)
- 按操作方式分类(Unsafe,非Unsafe)
- 按所在空间分类(Heap,Direct)
所以总共有8种:
- PooledUnsafeHeapByteBuf
- UnPooledUnsafeHeapByteBuf
- PooledHeapByteBuf
- UnPooledHeapByteBuf
- PooledUnsafeDirectByteBuf
- UnPooledUnsafeDirectByteBuf
- PooledDirectByteBuf
- UnPooledDirectByteBuf
如何减少多线程内存分配之间的竞争
各个NioEventLoop有自己的Arena,里包括申请的ChunkList.用类似ThreadLocal的方式实现不同大小的内存是如何进行分配的
多个内存规格
NIO编程示例
Netty是nio的封装,所有操作都是间接调用了NIO的API.先看一下通常NIO的服务端启动代码:
1 | ServerSocketChannel serverChannel = ServerSocketChannel.open(); |
服务端启动
通过反射构造用户代码中传入的
NioServerSocketChannel.class
作为服务端channel
它的构造函数会做的事情:- 与NIO编程中创建ServerSocketChannel的方式相同,通过JDK的
SelectorProvider.provider().open...()
来获取 - 在
NioServerSocketChannel
中保存这个ServerSocketChannel和感兴趣的事件(OP_ACCEPT
) - 通过
serverChannel.configureBlocking(false)
设置为非阻塞(NIO编程中也会有这一步) - 设置id,获取并保存JDK的unsafe,创建并保存新的pipeline(仅包含tail和head节点)
- 与NIO编程中创建ServerSocketChannel的方式相同,通过JDK的
对这个服务端Channel进行初始化
- 把用户代码中写入的option和attr加入到服务端Channel
- 把用户代码中的
.handler(/**/)
加入到服务端Channel的pipeline,注意这里不是.childHandler()
- 把用户代码中加入的一系列ChildGroup相关配置(如childHandler,childOption等)封装为
ServerBootstrapAcceptor
并把这个ServerBootstrapAcceptor
延迟加入到服务端Channel的pipeline
注册selector并获取
ChannelFuture
把当前channel注册到NIO的selector,并把服务端Channel作为attachment参数加入
1
2
3
4
5selectionKey = javaChannel()//返回之前创建服务端channel时创建的底层Channel
.register( //jdk channel的api
eventLoop().selector,//让他注册到这个selector中
0,//注册时需要关心的事件.这里是0说明不需要关心,只是单纯把channel绑定到selector上
this); //attachment所以,在这个阶段我们并没有对accept事件进行绑定
此时已经完成注册,调用
pipeline.invokeHandlerAddedIfNeeded
和pipeline.fireChannelRegistered
传递服务端Handler的channelRegistered
事件和handlerAdded
事件
绑定端口
- 通过JDK的api为服务端Channel绑定端口
- 通过
pipeline.fireChannelActive()
传播事件?
其实这个并不是单纯的传播事件,它会做:- 从head节点往下传播事件
- 回到selector中把服务端Channel感兴趣的事件设置为OP_ACCEPT
之前把channel注册到selector时获取了selectionKey
,这里会拿到它,并把它感兴趣的事件设置为OP_ACCEPT.这个OP_ACCEPT就是在构造NioServerSocketChannel.class
时设置的
NioEventLoop创建
启动netty服务器时会通过两个new NioEventLoopGroup()
分别创建bossGroup和workerGroup,其实上一章服务器启动的过程中已经用到了它们:
- 从eventLoop获取selector,把服务端Channel注册到这个selector上
- 对于一些可以延迟进行的操作,我们封装成Runnable,并调用eventLoop的execete()来延迟执行
下面分析它的创建过程:
创建一个为每一次执行任务都创建一个线程实体的工厂executor.但它创建的并不是
new Thread
,而是netty对此进行优化过后的FastThreadLocalThread
创建CPU*2(默认)个
NioEventLoop
,保存为变量保存到NioEventLoopGroup
创建NioEventLoop
的过程:- 通过JDK获取到selector并保存(一个NioEventLoop对应一个selector)
- 保存上一步创建的executor(CPU*2个NioEventLoop对应一个executor)
- 创建taskQueue(一个NioEventLoop对应一个taskQueue)
外部线程执行netty任务的时候.不会直接让NioEventLoop执行,而是塞到队列里面让它执行
通过chooserFactory.newChooser()构造线程选择器
目的是当有新连接进来时调用MultithreadEventExecutorGroup#next
来选择合适的NioEventLoop
进行绑定如何选择线程进行绑定的?很简单,当第一个新连接进来的时候绑定NioEventLoop[]中的第一个,第二个连接则绑定第二个,超过NioEventLoop[]大小后又回到第一个
NioEventLoop执行
当首次在一个NioEventLoop上执行execute()
时会进行一系列的操作来让这个NioEventLoop启动.放入一个类似于NIO编程中的for(;;)循环,从而阻塞selector中的事件.io.netty.channel.nio.NioEventLoop#run
也就是说NioEventLoop都是延迟启动?
它会在这个for(;;)循环中做以下事情:
通过select()轮询,检查selector上的事件
要注意的是,他并不是单纯调用selector.select()的阻塞式select,它的处理逻辑:deadline以及任务穿插逻辑处理
- deadline:根据nioEventLoop是否有定时任务
- 任务队列中是否有任务要执行
满足上面任意一个.select()就会停止并退出
阻塞式select
- 没有到deadLine并且任务队列为空的情况
- 默认是1秒钟,外部线程也可以在这时把阻塞操作唤起
避免jdk空轮询的bug
JDK中存在空轮询的bug,当调用阻塞式的selector.select()但却立即返回了的时候,很可能就是触发了这个bug.netty会当这种情况出现512次时进行处理
思想很简单:把老的selectedKeys都注册到一个新的selector里面去,替换当前的selector
通过processSelectKeys()处理轮询到的io事件
它会调用processSelectedKeysOptimized(selectedKeys.flip());
来处理io事件,但这里的selectedKeys是什么?
在创建NioEventLoop的过程中其实我们获取JDK的selector的同时,创建了SelectedSelectionKeySet
用于替换selector中原有的selectedKeySet.
这个优化了的KeySet是用数组实现的,减少了HashSet原有的复杂度和对于netty不必要的功能.从优化后的selectedKeys中获取刚才selector轮询到的事件的SelectionKey,获取这个SelectionKey的attachment.如果是我们之前设置的channel那么这个attachment应该是服务端channel
对服务端channel和SelectionKey调用processSelectedKey(SelectionKey,服务端channel)进行处理
实际上就是拿到服务端Channel的unsafe,通过这个Unsafe处理SelectionKey对应的事件(下一章解析ACCEPT事件的处理流程)
runAllTask()这一步会处理异步任务队列或定时的异步任务队列
首先得先知道这两个任务队列分别是什么,在什么时候被创建的- 普通任务队列 mpscQueue
在nioeventLoop的构造方法中创建,当它的executor(Runable)
被调用时会把这个Runable作为task加入这个队列. - 定时任务队列 scheduledTaskQueue
一个PriorityQueue类型的队列,按截止时间排序.nioeventLoop的schedule()首次被调用时创建,同时把scheduledTask加入
它的执行逻辑:
- 调用
fetchFromScheduledTaskQueue();
把已经到时间的定时任务从队列取出合并到普通任务队列 - 在当前NioEventLoop线程中挨个执行普通任务队列中的每个任务,其实就是单纯调用它们的run(),直到超过参数指定的超时时间.
- 普通任务队列 mpscQueue
新连接接入
- 服务端channel绑定的selector轮训出ACCEPT事件
- 为新链接创建客户端Chanel
- 通过JDK的
javaChannel().accept()
可以获取SocketChannel
,在轮询到ACCEPT事件时调用它获取SocketChannel - 构造NioSocketChannel,也就是客户端Channel
构造函数传入服务端Channel(NioServerSocketChannel)和上一步通过JDK获取的SocketChannel,还有感兴趣的事件OP_READ
- 接下来和创建服务端Channel相似,创建id,创建unsafe,创建新的pipeline
- 关闭服务端Socket的nagel算法,让小数据包集成为大数据包后发送
- 通过JDK的
整理一下Netty中的Channel的分类
- NioServerSocketChannel
服务端Channel,用于获取ACCEPT事件
服务端Channel是从用户代码取到.channel(NioServerSocketChannel.class)
的类对象后通过反射创建 - NioSocketChannel
客户端channel
在NioEventLoop检测到ACCEPT事件时通过new创建,用于获取READ事件 - Unsafe
实现Channel底层的协议- 对于服务端是NioMessageUnsafe:
AbstractNioMessageChannel#newUnsafe
中返回了NioMessageUnsafe
,用于读新的连接
它的read方法会检测一个accept事件,然后通过doReadMessages()
为他创建nioSocketChannel(客户端channel) - 对于客户端是NioByteUnsafe:
AbstractNioByteChannel#newUnsafe
中返回了NioByteUnsafe
,用于读io数据
它的read方法会通过doReadBytes()
读取数据,放入一个byteBuf中
- 对于服务端是NioMessageUnsafe:
客户端线程初始化以及注册到woker线程的selector
在上一步创建客户端Channel完毕后会调用pipeline.fireChannelRead(readBuf.get(i));
(readBuf.get(i)是新创建的服务端Channel)
这个pipeline是服务端Channel的pipeline,里面包含了服务端Handler和封装了客户端Channel相关数据的ServerBootstrapAcceptor
此时这个服务端pipeline的结构如下1
head -> ServerBootstrapAcceptor -> Tail
当调用
pipeline.fireChannelRead(readBuf.get(i));
时这个事件会从head节点开始传播,然后会调用到ServerBootstrapAcceptor#channelRead
.通过这个方法对新创建的客户端Channel进行一系列初始化操作
- 它会通过一个for循环把自己持有的客户端配置依次赋予给这个新创建的客户端Channel
- 把客户端channel注册到worker线程的NioEventGroup中的某一个的selector(选择器选择)
注册感兴趣的事件
与服务端Channel启动时相似,它会再次拿到selectionKey,把创建时加入的OP_READ作为感兴趣的事件设置到selectionKey
pipeline
pipeline创建
- pipeline的创建
在创建Channel的时候被创建,此时只包含head和tail两个节点 - 所有pipeline节点都实现了ChannelHandlerContext接口
也就是说所有pipeline节点都是以下的结构- 当前节点属于那个channel
- 哪一个nioEventLoop会执行这个节点
- 事件的传播方式
- 属于哪个pipeline
- 需要分配内存时(bytebuf),使用哪一个内存分配器
- 存储attr
tail节点
一个inbound处理器
tail主要是做一些收尾的信息,只会对未完善的部分输出logger进行提醒程序员.
inbound事件:请求事件,传输方向是 tail -> head.,如write()head节点
一个outbound处理器
它的工作:1.把事件原封不动的进行传播,2.把读写操作委托给unsafe
outbound事件:通知事件,传输方向是head->tail,如channelRead()
在pipeline中添加ChannelHandler
通过如下的方式可以添加
1 | ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); |
这里的addLast()会做些什么?
- 判断是否重复添加
- 创建节点并添加至链表
- 把这个ChannelHandler包装成ChannelHandlerContext
- 添加到链表,实际上就是添加到tail节点的前面
- 回调添加完成事件
传播handlerAdded事件,通知添加完成
ByteBuf
太多了,看这里:Netty源码剖析-ByteBuf-总结