农场主的黑科技.

Netty源码剖析-新连接接入

字数统计: 2.7k阅读时长: 17 min
2018/12/28 Share
  • Netty是在哪里检测到有新连接接入的?

  • 新连接是怎样注册到NioEventLoop线程的?

Netty新连接接入处理逻辑

  1. 检测新连接

    服务端channel绑定的selector轮训出ACCEPT事件

  2. 创建NioSocketChannel

    基于jdkChannel创建neety自己的NioSocketChannel,也就是客户端Channel

  3. 分配线程及注册selector

    给这个客户端channel分配一个NioEventLoop,并把它注册到NioEventLoop的selector上,之后都由这个NioEventLoop进行管理

  4. 向selector注册读事件

    和ACCEPT事件是同一段逻辑

1.检测新连接

NioEventLoop为检测到的ACCEPT事件创建客户端channel(NioSocketChannel)

1
2
3
4
5
6
7
8
processSelectedKey(key,channel)[入口]//NioEventLoop的run()中
|
NioMessageUnsafe.read()
|
deReadMessage()[while循环]//对所有accept事件创建NioSocketChannel
|
javaChannel().accapt()
创建jdk的channel,此时把服务端channel和这个channel作为参数传入NioSocketChannel的构造函数中

2.创建NioSocketChannel

1
2
3
4
5
6
7
8
9
10
11
12
13
new NioSocketChannel(parent,ch)[入口]	//创建服务端channel的时候是通过反射构建,而客户端是直接new
|
AbstractNioByteChannel(p,ch,op_read)//父类构造函数
|
configureBlocking(false) & save op_read //把这个NioSocketChannel设置为非阻塞
//,并把op_read保存到成员变量
|
create id,unsafe,pipeline //创建与此channel相关的一些组件
唯一标示,底层数据的读写,业务逻辑的载体
|
new NioSocketChannelConfig() //创建与客户端channel绑定的配置类
|
setTcpNoDelay(true) //禁止Nagle算法,让数据包尽可能地发出去

在selector阻塞到ioAccept事件时会调用这么个方法(io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages)

1
2
3
4
5
6
7
8
9
10
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//创建jdk底层的channel
SocketChannel ch = javaChannel().accept();
if (ch != null) {
//参数是当前的这个服务端channel和jdk的客户端channel
buf.add(new NioSocketChannel(this, ch));//创建netty自定义的客户端NioSocketChannel
return 1;
}
}

跟踪一下NioSocketChannel的构造函数

1
2
3
4
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket); //1.调用父类构造函数
config = new NioSocketChannelConfig(this, socket.socket()); //2.创建与客户端channel绑定的配置类
}

先跟踪1.调用父类构造函数这一步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ); //传入op_read
//绑定到selector时,可以告诉他自己对这个op_read事件感兴趣.所以当发生read事件时会通知他.
}
---
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch; //保存jdk的channel
this.readInterestOp = readInterestOp; //保存传入的op_read
ch.configureBlocking(false); //把这个jdk的channel设置为为阻塞
}
---
protected AbstractChannel(Channel parent) {
this.parent = parent; //服务端channel,保存
id = newId(); //创建唯一标示,底层数据的读写,业务逻辑的载体
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}

再看一下2.创建与客户端channel绑定的配置

1
2
3
4
5
6
7
8
9
10
11
config = new NioSocketChannelConfig(this, socket.socket());//传入底层的socket
---
//父类构造函数
public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) {
super(channel);
this.javaSocket = javaSocket; //保存底层的socket

if (PlatformDependent.canEnableTcpNoDelayByDefault()) {//通常情况下true
setTcpNoDelay(true);//禁止Nagle算法,让数据包尽可能地发出去
}
}

Nagel算法:让小数据包集成为大数据包后发送

1
2
3
4
5
6
@Override
public SocketChannelConfig setTcpNoDelay(boolean tcpNoDelay) {
//实际上就是给刚才保存的jdk的socket设置
javaSocket.setTcpNoDelay(tcpNoDelay);
return this;
}

Netty中的Channel的分类

  • NioServerSocketChannel
    服务端Channel
    服务端Channel是从用户代码取到.channel(NioServerSocketChannel.class)的类对象后通过反射创建

  • NioSocketChannel
    客户端channel
    在NioEventLoop检测到ACCEPT事件时通过new创建

  • Unsafe
    用于实现每种Channel底层的协议

Channel的层级关系

1546043964310

左边是客户端channel,右边是服务端channel

  • Channel
    一个接口.socket的读,写,连接,绑定操作的抽象

  • AbstractChannel
    Channel的抽象实现
    服务端的channel,id,unsafe,pipeline,eventLoop都会保存在这里

  • AbstractNioChannel
    用selector进行读写事件的监听
    保存服务端或客户端channel的selectionKey,保存服务端或客户端的jdkChannel
    服务端和客户端channel都继承与他,说明他们都会用selector轮询io事件.但他们向它传递的构造函数参数中的监听事件是不同的.前者是OP_ACCEPT,后者是OP_READ

  • Unsafe
    实现Channel底层的协议,

    服务端和客户端各自对应的Unsafe不同,前者是NioByteUnsafe,后者时NioMessageUnsafe
    在哪创建unsafe呢,AbstractChannel的构造函数会调用子类的newUnsafe方法.

    Unsafe是channel读写操作的抽象

    • 服务端channel
      AbstractNioMessageChannel#newUnsafe中返回了NioMessageUnsafe
      指读一条新的连接
      它的read方法会检测一个accept事件,然后通过doReadMessages()为他创建nioSocketChannel(客户端channel)

    • 客服端channel
      AbstractNioByteChannel#newUnsafe中返回了NioByteUnsafe,
      指读取io数据

      它的read方法会通过doReadBytes()读取数据,放入一个byteBuf中

  • ChannelConfig
    用于存储客户端或服务端Channel的配置

3.新连接NioEventLoop分配和selector注册

目的是给这个客户端channel分配一个NioEventLoop,并把它注册到NioEventLoop的selector上,之后都由这个NioEventLoop的selector进行管理

回到NioEventLoop调用的read(),首先在上一步中它通过doReadMessages()为ACCEPT创建nioSocketChannel(客户端channel)之后通过一下步骤把客户端channel注册到当前nioEventLoop的selector中:

\AbstractNioMessageChannel.NioMessageUnsafe#read

1
2
3
4
5
int size = readBuf.size();	//readBuf是新创建的nioSocketChannel的集合
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); //为新的nioSocketChannel调用pipeline的这个方法
}

这个pipeline是什么?在这里回顾一下服务端channel被构造时的步骤

1
2
3
4
5
6
7
8
9
init()[初始化服务端Channel] //io.netty.bootstrap.ServerBootstrap#init
|
set ChannelOptions,ChannelAttrs //配置用户自定义的服务端配置
|
set childOption,childAttr //配置用户自定义的连接配置,为每个新连接都配置
|
config handler [配置服务端pipeline] //用户代码中通过.handler()设置的部分
|
add ServerBootstrapAcceptor[添加连接器] //添加一个给accept用的NIO的线程

在这最后一步中会执行这么一段

1
2
3
pipeline.addLast(new ServerBootstrapAcceptor(
//用户代码传进来的一些信息
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

这个ServerBootstrapAcceptor又是什么呢?服务端Channel的pipeline由3个handler构成:

1
head -> ServerBootstrapAcceptor -> Tail

而这个pipeline.fireChannelRead(readBuf.get(i));这一行会去调用head,然后由head调用ServerBootstrap.ServerBootstrapAcceptor#channelRead方法

那么pipeline.fireChannelRead()最终会干些什么?

  • 添加childHandler
    将用户代码中定义的childHandler放入新连接的pipeline中
  • 设置options和attrs
  • 选择NioEventLoop并注册selector

1.添加childHandler

看一下源码

1
2
3
4
5
6
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler); //1. 添加childHandler

这个childHandler是在用户代码中通过以下添加的:

1
2
3
4
5
6
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {//给连接加入逻辑
ch.pipeline().addLast(new AuthHandler());
}
});

实际上添加的是一个ChannelInitializer,它暴露了一个initChannel(),从而让用户自定义handler.而ChannelInitializer在每次添加完handler之后它会将自身进行删除.看一下ChannelInitializer的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);//添加完成后调用这个
}
}
---
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) {
try {
initChannel((C) ctx.channel()); //回调用户代码中的initChannel()
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx); //将自身进行删除
}
return true;
}
return false;
}

2.设置options和attrs

继续看channelRead(),这两个也都是在init()中构建ServerBootstrapAcceptor时传入的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);//上一步

//options:底层tcp读写相关的参数
for (Entry<ChannelOption<?>, Object> e: childOptions) {
//取channel的config,把用户自定义的options放入到config中
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
}
//attr:客户端channel中绑定一些属性,比如你自定义的超时时间等
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
//取channel的config,把用户自定义的attr放入到config中
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
//..
}

3.选择NioEventLoop并注册selector

继续看channelRead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//..省略上两步

//这个childGroup是NioEventGroup
//把客户端channel注册到NioEventGroup中的某一个NioEventLoop的selector
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}

childGroup.register(child)的这一步它会调用(MultithreadEventLoopGroup#register):

1
2
3
4
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}

而这个next(),会调用MultithreadEventExecutorGroup#next

1
2
3
4
@Override
public EventExecutor next() {
return chooser.next();//这就是我们最早创建NioEventLoop时放入的线程选择器了
}

也就是说我们会通过线程选择器选出一个NioEventLoop,并调用它的register(),把客户端channel(NioSocketChannel)注册进去

看一下这个register()是怎样的一个步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
---
@Override
public ChannelFuture register(final ChannelPromise promise) {
//客户端channel的byteUnsafe
promise.channel().unsafe().register(this, promise);
return promise;
}
---io.netty.channel.AbstractChannel.AbstractUnsafe#register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

AbstractChannel.this.eventLoop = eventLoop; //把当前的NioeventLoop保存到客户端channel

//inEventLoop判定是否时服务端channel,而这个时客户端的
if (eventLoop.inEventLoop()) {//返回false,
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
---
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
doRegister();
//...
}

看来最终会调用AbstractNioChannel#doRegister进行注册

1
2
3
4
5
6
7
8
9
10
11
12
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//此时的this是nioSocketChannel
//在这一步中把nioSocketChannel通过jdk提供的api注册到了selector
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
}
}
}

4.向selector注册读事件

其实是和ACCEPT事件同一段逻辑,继续看上一步骤执行register的部分AbstractChannel.AbstractUnsafe#register0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private void register0(ChannelPromise promise) {
boolean firstRegistration = neverRegistered;
doRegister(); //上一步..把客户端channel注册到selector上
neverRegistered = false;
registered = true;

//把用户代码中编写的.childHandler()部分的channelInitializer实际加入到pipeline中
pipeline.invokeHandlerAddedIfNeeded();//调用channelInitializer的handlerAdded方法

safeSetSuccess(promise);
pipeline.fireChannelRegistered();

if (isActive()) {//接入并且注册到selector上了
if (firstRegistration) {//第一次注册到NioEventLoop上
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
}

此时的pipeline.fireChannelActive();会调用head最终会调用DefaultChannelPipeline.HeadContext#channelActive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
        @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();//active事件进行传播

readIfIsAutoRead();//
}
---
private void readIfIsAutoRead() {
//服务端channel创建时也调用了这么一段
if (channel.config().isAutoRead()) {//自动读,只要绑定了端口就会自动接收连接.默认true
//也就是有连接时,向selector放入一个读事件
channel.read();
}
}
---
@Override
public Channel read() {
pipeline.read();
return this;
}
---
@Override
public final ChannelPipeline read() {
tail.read();//从pipeLine的tail开始向前传播
return this;
}

这个read最终会调用unsafeNioSocketChannelUnsafebeginRead

1
2
3
4
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}

而他最终调用的是AbstractNioChannel#doBeginRead,一个服务端channel创建时也会执行的一个方法.

1
2
3
4
5
6
7
8
9
10
11
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
readPending = true;

final int interestOps = selectionKey.interestOps();//之前注册到selector时放入的时0
if ((interestOps & readInterestOp) == 0) {//这里返回0
//把读事件绑定到selector上去.今后如果selector轮询到,就可以进行数据的读写
selectionKey.interestOps(interestOps | readInterestOp);
}
}

几个问题

  • Netty是在哪里检测到有新连接接入的?
    轮询出accept事件,通过jdk底层的accept方法创建这条连接

  • 新连接是怎样注册到NioEventLoop线程的?

    Boss线程通过NioEventGroup的chooser的next()方法,选出一个NioEventLoop,然后将这条新连接注册到NioEventLoop的selector上

CATALOG
  1. 1. Netty新连接接入处理逻辑
    1. 1.1. 1.检测新连接
    2. 1.2. 2.创建NioSocketChannel
    3. 1.3. Netty中的Channel的分类
    4. 1.4. Channel的层级关系
    5. 1.5. 3.新连接NioEventLoop分配和selector注册
      1. 1.5.1. 1.添加childHandler
      2. 1.5.2. 2.设置options和attrs
      3. 1.5.3. 3.选择NioEventLoop并注册selector
    6. 1.6. 4.向selector注册读事件
  2. 2. 几个问题