农场主的黑科技.

Netty源码剖析-总结

字数统计: 3.7k阅读时长: 22 min
2019/02/10 Share

netty对JDK的NIO进行一系列的封装,下面总结netty中的一些概念,不涉及源码,源码解析部分在之前的博客中有记录.

常见问题

  • Boss线程
    就是服务端线程,用于管理ACCEPT事件,当发生ACCEPT事件时获取新连接的Channel在pipeline中进行传播
  • worker线程
    用于处理客户端,数量是默认的CPU*2个
    创建服务端Channel(Boss)时会把用于客户端的数据封装成ServerBootstrapAcceptor,其中也包括CPU×2个worker的NioEventLoop信息,他会被加入服务端Channel的pipeline.
    当服务端轮询到accept事件时就会在服务端pipeline中进行传播,当传播到这个ServerBootstrapAcceptor的时候,ServerBootstrapAcceptor会把自己包含的客户端信息赋予给新连接的Channel,并把这个新的客户端Channel注册到某个WorkerSelector上,用于轮询READ事件.

服务端启动

  • 服务端的socket在哪里初始化?
    服务端启动时,也就是ServerBootstrap.bind()被用户代码调用时,通过反射构建NioServerSocketChannel作为服务端的SocketChannel
  • 在哪里accept连接?
    把服务端Channel注册到Boss线程的Selector,并把感兴趣的事件设置为OP_ACCEPT,通过Select轮询accept连接

NioEventLoop

  • 默认情况下,Netty服务端起多少线程?何时启动?
    默认cpu*2,调用execute方法的时候判断当前是否是在本线程,如果是说明已经启动.如果是在外部线程调用,此时会启动线程
  • Netty是如何解决jdk空轮询bug的?
    用计数的方式去判断.如果当前阻塞的操作实际上没有花费时间,那么有可能触发了空轮询的bug.如果持续了512次,那么就判断出发了bug,新建一个selector把,把之前的key复制到这个新的selector.
  • netty如何保证异步串行无锁化?
    netty在外部线程调用execute时,通过inEventLoop()判定是否是外部线程.此时将所有的操作封装成一个task,丢到mpsc队列中,随后这些task会被挨个执行

新连接

  • Netty是在哪里检测到有新连接接入的?
    Boss线程的selector轮训ACCEPT事件
  • 新连接是怎样注册到NioEventLoop线程的?
    Boss线程轮询到ACCEPT事件后在pipeline中进行传播,pipeline中包含一条ServerBootstrapAcceptor,里面包含了CPU×2个worker的信息还有服务端Channel的信息.
    把服务端Channel的信息放入到新连接,并通过选择器拿到一个worker,把新连接作为READ事件注册到这个worker的selector

pipeline

  • netty如何判断ChannelHandler类型的?
    添加一个节点时,pipeline会通过一个instanceof来判断是inbound类型还是outbound类型,然后用布尔变量来标示

  • 对于ChannelHandler的添加应该遵循什么样的顺序?
    inBound事件的传播顺序和inBoundHandler的添加顺序相同
    outBound事件的传播顺序和outBoundHandler的添加顺序相反

  • 用户手动触发事件传播,不同的触发方式有什么样的区别?

    • inbound事件,如

      1
      2
      ctx.fireChannelRead(msg);	//从当前节点继续往下传播
      ctx.channel().pipeline().fireChannelRead("hello world");//从head节点开始往下传播
    • outbound事件,如

      1
      2
      ctx.write(msg, promise);	//从当前节点开始往下传播
      ctx.channel().write("hello world"); //从tail开始传播

ByteBuf

  • 内存的类别有哪些

    三个维度

    • 按内存的分配方式分类(Pooled,UnPooled)
    • 按操作方式分类(Unsafe,非Unsafe)
    • 按所在空间分类(Heap,Direct)

    所以总共有8种:

    • PooledUnsafeHeapByteBuf
    • UnPooledUnsafeHeapByteBuf
    • PooledHeapByteBuf
    • UnPooledHeapByteBuf
    • PooledUnsafeDirectByteBuf
    • UnPooledUnsafeDirectByteBuf
    • PooledDirectByteBuf
    • UnPooledDirectByteBuf
  • 如何减少多线程内存分配之间的竞争
    各个NioEventLoop有自己的Arena,里包括申请的ChunkList.用类似ThreadLocal的方式实现

  • 不同大小的内存是如何进行分配的
    多个内存规格

NIO编程示例

Netty是nio的封装,所有操作都是间接调用了NIO的API.先看一下通常NIO的服务端启动代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if (n == 0) continue;
Iterator ite = this.selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey)ite.next();
if (key.isAcceptable()){
SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
clntChan.configureBlocking(false);
//将选择器注册到连接到的客户端信道,
//并指定该信道key值的属性为OP_READ,
//同时为该信道指定关联的附件
clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
}
if (key.isReadable()){
handleRead(key);
}
if (key.isWritable() && key.isValid()){
handleWrite(key);
}
if (key.isConnectable()){
System.out.println("isConnectable = true");
}
ite.remove();
}
}

服务端启动

  1. 通过反射构造用户代码中传入的NioServerSocketChannel.class作为服务端channel
    它的构造函数会做的事情:

    1. 与NIO编程中创建ServerSocketChannel的方式相同,通过JDK的SelectorProvider.provider().open...()来获取
    2. NioServerSocketChannel中保存这个ServerSocketChannel和感兴趣的事件(OP_ACCEPT)
    3. 通过serverChannel.configureBlocking(false)设置为非阻塞(NIO编程中也会有这一步)
    4. 设置id,获取并保存JDK的unsafe,创建并保存新的pipeline(仅包含tail和head节点)
  2. 对这个服务端Channel进行初始化

    1. 把用户代码中写入的option和attr加入到服务端Channel
    2. 把用户代码中的.handler(/**/)加入到服务端Channel的pipeline,注意这里不是.childHandler()
    3. 把用户代码中加入的一系列ChildGroup相关配置(如childHandler,childOption等)封装为ServerBootstrapAcceptor并把这个ServerBootstrapAcceptor延迟加入到服务端Channel的pipeline
  3. 注册selector并获取ChannelFuture

    1. 把当前channel注册到NIO的selector,并把服务端Channel作为attachment参数加入

      1
      2
      3
      4
      5
      selectionKey = javaChannel()//返回之前创建服务端channel时创建的底层Channel
      .register( //jdk channel的api
      eventLoop().selector,//让他注册到这个selector中
      0,//注册时需要关心的事件.这里是0说明不需要关心,只是单纯把channel绑定到selector上
      this); //attachment

      所以,在这个阶段我们并没有对accept事件进行绑定

    2. 此时已经完成注册,调用pipeline.invokeHandlerAddedIfNeededpipeline.fireChannelRegistered传递服务端HandlerchannelRegistered事件和handlerAdded事件

  4. 绑定端口

    1. 通过JDK的api为服务端Channel绑定端口
    2. 通过pipeline.fireChannelActive()传播事件?
      其实这个并不是单纯的传播事件,它会做:
      1. 从head节点往下传播事件
      2. 回到selector中把服务端Channel感兴趣的事件设置为OP_ACCEPT
        之前把channel注册到selector时获取了selectionKey,这里会拿到它,并把它感兴趣的事件设置为OP_ACCEPT.这个OP_ACCEPT就是在构造NioServerSocketChannel.class时设置的

NioEventLoop创建

启动netty服务器时会通过两个new NioEventLoopGroup()分别创建bossGroup和workerGroup,其实上一章服务器启动的过程中已经用到了它们:

  • 从eventLoop获取selector,把服务端Channel注册到这个selector上
  • 对于一些可以延迟进行的操作,我们封装成Runnable,并调用eventLoop的execete()来延迟执行

下面分析它的创建过程:

  1. 创建一个为每一次执行任务都创建一个线程实体的工厂executor.但它创建的并不是new Thread,而是netty对此进行优化过后的FastThreadLocalThread

  2. 创建CPU*2(默认)个NioEventLoop,保存为变量保存到NioEventLoopGroup
    创建NioEventLoop的过程:

    1. 通过JDK获取到selector并保存(一个NioEventLoop对应一个selector)
    2. 保存上一步创建的executor(CPU*2个NioEventLoop对应一个executor)
    3. 创建taskQueue(一个NioEventLoop对应一个taskQueue)
      外部线程执行netty任务的时候.不会直接让NioEventLoop执行,而是塞到队列里面让它执行
  3. 通过chooserFactory.newChooser()构造线程选择器
    目的是当有新连接进来时调用MultithreadEventExecutorGroup#next来选择合适的NioEventLoop进行绑定

    如何选择线程进行绑定的?很简单,当第一个新连接进来的时候绑定NioEventLoop[]中的第一个,第二个连接则绑定第二个,超过NioEventLoop[]大小后又回到第一个

NioEventLoop执行

当首次在一个NioEventLoop上执行execute()时会进行一系列的操作来让这个NioEventLoop启动.放入一个类似于NIO编程中的for(;;)循环,从而阻塞selector中的事件.io.netty.channel.nio.NioEventLoop#run

也就是说NioEventLoop都是延迟启动?

它会在这个for(;;)循环中做以下事情:

  1. 通过select()轮询,检查selector上的事件
    要注意的是,他并不是单纯调用selector.select()的阻塞式select,它的处理逻辑:

    • deadline以及任务穿插逻辑处理

      • deadline:根据nioEventLoop是否有定时任务
      • 任务队列中是否有任务要执行

      满足上面任意一个.select()就会停止并退出

    • 阻塞式select

      • 没有到deadLine并且任务队列为空的情况
      • 默认是1秒钟,外部线程也可以在这时把阻塞操作唤起
    • 避免jdk空轮询的bug
      JDK中存在空轮询的bug,当调用阻塞式的selector.select()但却立即返回了的时候,很可能就是触发了这个bug.netty会当这种情况出现512次时进行处理
      思想很简单:把老的selectedKeys都注册到一个新的selector里面去,替换当前的selector

  2. 通过processSelectKeys()处理轮询到的io事件
    它会调用processSelectedKeysOptimized(selectedKeys.flip());来处理io事件,但这里的selectedKeys是什么?
    在创建NioEventLoop的过程中其实我们获取JDK的selector的同时,创建了SelectedSelectionKeySet用于替换selector中原有的selectedKeySet.
    这个优化了的KeySet是用数组实现的,减少了HashSet原有的复杂度和对于netty不必要的功能.

    1. 从优化后的selectedKeys中获取刚才selector轮询到的事件的SelectionKey,获取这个SelectionKey的attachment.如果是我们之前设置的channel那么这个attachment应该是服务端channel

    2. 对服务端channel和SelectionKey调用processSelectedKey(SelectionKey,服务端channel)进行处理

      实际上就是拿到服务端Channel的unsafe,通过这个Unsafe处理SelectionKey对应的事件(下一章解析ACCEPT事件的处理流程)

  3. runAllTask()这一步会处理异步任务队列或定时的异步任务队列
    首先得先知道这两个任务队列分别是什么,在什么时候被创建的

    • 普通任务队列 mpscQueue
      在nioeventLoop的构造方法中创建,当它的executor(Runable)被调用时会把这个Runable作为task加入这个队列.
    • 定时任务队列 scheduledTaskQueue
      一个PriorityQueue类型的队列,按截止时间排序.nioeventLoop的schedule()首次被调用时创建,同时把scheduledTask加入

    它的执行逻辑:

    1. 调用fetchFromScheduledTaskQueue();把已经到时间的定时任务从队列取出合并到普通任务队列
    2. 在当前NioEventLoop线程中挨个执行普通任务队列中的每个任务,其实就是单纯调用它们的run(),直到超过参数指定的超时时间.

新连接接入

  1. 服务端channel绑定的selector轮训出ACCEPT事件
  2. 为新链接创建客户端Chanel
    1. 通过JDK的javaChannel().accept()可以获取SocketChannel,在轮询到ACCEPT事件时调用它获取SocketChannel
    2. 构造NioSocketChannel,也就是客户端Channel
      构造函数传入服务端Channel(NioServerSocketChannel)和上一步通过JDK获取的SocketChannel,还有感兴趣的事件OP_READ
    3. 接下来和创建服务端Channel相似,创建id,创建unsafe,创建新的pipeline
    4. 关闭服务端Socket的nagel算法,让小数据包集成为大数据包后发送

整理一下Netty中的Channel的分类

  • NioServerSocketChannel
    服务端Channel,用于获取ACCEPT事件
    服务端Channel是从用户代码取到.channel(NioServerSocketChannel.class)的类对象后通过反射创建
  • NioSocketChannel
    客户端channel
    在NioEventLoop检测到ACCEPT事件时通过new创建,用于获取READ事件
  • Unsafe
    实现Channel底层的协议
    • 对于服务端是NioMessageUnsafe:
      AbstractNioMessageChannel#newUnsafe中返回了NioMessageUnsafe,用于读新的连接
      它的read方法会检测一个accept事件,然后通过doReadMessages()为他创建nioSocketChannel(客户端channel)
    • 对于客户端是NioByteUnsafe:
      AbstractNioByteChannel#newUnsafe中返回了NioByteUnsafe,用于读io数据
      它的read方法会通过doReadBytes()读取数据,放入一个byteBuf中
  1. 客户端线程初始化以及注册到woker线程的selector
    在上一步创建客户端Channel完毕后会调用pipeline.fireChannelRead(readBuf.get(i)); (readBuf.get(i)是新创建的服务端Channel)
    这个pipeline是服务端Channel的pipeline,里面包含了服务端Handler和封装了客户端Channel相关数据的ServerBootstrapAcceptor
    此时这个服务端pipeline的结构如下

    1
    head -> ServerBootstrapAcceptor -> Tail

    当调用pipeline.fireChannelRead(readBuf.get(i));时这个事件会从head节点开始传播,然后会调用到ServerBootstrapAcceptor#channelRead.

    通过这个方法对新创建的客户端Channel进行一系列初始化操作

    1. 它会通过一个for循环把自己持有的客户端配置依次赋予给这个新创建的客户端Channel
    2. 把客户端channel注册到worker线程的NioEventGroup中的某一个的selector(选择器选择)
  2. 注册感兴趣的事件
    与服务端Channel启动时相似,它会再次拿到selectionKey,把创建时加入的OP_READ作为感兴趣的事件设置到selectionKey

pipeline

pipeline创建

  • pipeline的创建
    在创建Channel的时候被创建,此时只包含head和tail两个节点
  • 所有pipeline节点都实现了ChannelHandlerContext接口
    也就是说所有pipeline节点都是以下的结构
    • 当前节点属于那个channel
    • 哪一个nioEventLoop会执行这个节点
    • 事件的传播方式
    • 属于哪个pipeline
    • 需要分配内存时(bytebuf),使用哪一个内存分配器
    • 存储attr
  • tail节点
    一个inbound处理器
    tail主要是做一些收尾的信息,只会对未完善的部分输出logger进行提醒程序员.
    inbound事件:请求事件,传输方向是 tail -> head.,如write()

  • head节点
    一个outbound处理器
    它的工作:1.把事件原封不动的进行传播,2.把读写操作委托给unsafe
    outbound事件:通知事件,传输方向是head->tail,如channelRead()

在pipeline中添加ChannelHandler

通过如下的方式可以添加

1
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());

这里的addLast()会做些什么?

  • 判断是否重复添加
  • 创建节点并添加至链表
    1. 把这个ChannelHandler包装成ChannelHandlerContext
    2. 添加到链表,实际上就是添加到tail节点的前面
  • 回调添加完成事件
    传播handlerAdded事件,通知添加完成

ByteBuf

太多了,看这里:Netty源码剖析-ByteBuf-总结

CATALOG
  1. 1. 常见问题
  2. 2. NIO编程示例
  3. 3. 服务端启动
  4. 4. NioEventLoop创建
  5. 5. NioEventLoop执行
  6. 6. 新连接接入
  7. 7. pipeline
    1. 7.1. pipeline创建
    2. 7.2. 在pipeline中添加ChannelHandler
  8. 8. ByteBuf