农场主的黑科技.

Netty源码剖析-Netty服务端启动的过程

字数统计: 2.6k阅读时长: 16 min
2018/12/25 Share

一个简单的netty服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final class Server {

public static void main(String[] args) throws Exception {
//两种类型的event loop
EventLoopGroup bossGroup = new NioEventLoopGroup(1); //对应com.imooc.netty.ch2.Server.start,新连接
EventLoopGroup workerGroup = new NioEventLoopGroup(); //对应com.imooc.netty.ch2.Client.start,对连接进行逻辑操作

try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)//配置两种event loop
.channel(NioServerSocketChannel.class)//设置服务端的channel
.childOption(ChannelOption.TCP_NODELAY, true)//给每个连接设置TCP的基本属性
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")//给每个连接绑定一些基本的属性
.handler(new ServerHandler())//加入服务端启动时的逻辑,对应于com.imooc.netty.ch2.Server.doStart
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {//给连接加入逻辑
// ch.pipeline().addLast(new AuthHandler());
//..
}
});

ChannelFuture f = b.bind(8888).sync(); //绑定端口

f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
  • 服务端的socket在哪里初始化?
  • 在哪里accept连接?

Netty服务端启动的过程

  1. 创建服务端Channel
    调用JDK的API创建Channel,绑定一些基本组件
  2. 初始化服务端Channel
    初始化一些基本属性,添加一些逻辑处理器
  3. 注册selector
    将底层的Channel注册到事件轮询器selector上,再把服务端channel注册到底层
  4. 端口绑定

创建服务端Channel

1
2
3
4
5
6
7
bind()[用户端代码入口]
|
doBind()
|
initAndRegist()[初始化并注册]
|
newChannel()[创建服务端Channel]

最终在channelFactory.newChannel()中通过反射创建Channel。

1
2
3
4
5
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
init(channel);
...

此时发现这个channelFactory实际是一个叫ReflectiveChannelFactory的类型,是他用反射创建的channel.

那么这个ReflectiveChannelFactory是在哪里初始化的?

看一下上一节的代码中的

1
.channel(NioServerSocketChannel.class)//设置服务端的channel

中到底做了哪些事情。

点进去会发现是这么一段

1
return channelFactory(new ReflectiveChannelFactory<C>(channelClass))//channelClass参数就是NioServerSocketChannel.class

也就是这个时候帮他设置到里面去的,所以最终反射构 建的channel是NioServerSocketChannel.class

知道它通过反射把channel构造为NioServerSocketChannel后,去看看这个NioServerSocketChannel的构造函数会做那些事。

反射构建服务端Channel

了解我们通过反射构造Channel的同时做了哪些事.

1
2
3
4
5
6
7
8
9
newSocket()[通过jdk来创建底层jdk channel]
|
NioServerSocketChannelConfig()[tcp参数配置类]
|
AbstractNioChannel()
|
configureBlocking(false)[设置为非阻塞模式]
|
AbstractChannel()[创建id,unsafe,pipeline]

newSocket()相关部分

以下基于 io.netty.channel.socket.nio.NioServerSocketChannel源代码

NioServerSocketChannel的构造函数调用 newSocket(),里面又调用provider.openServerSocketChannel();创建并返回ServerSocketChannelServerSocketChannel是JDK底层的Channel。

这么一来底层的channel就创建好了.接下来构造函数会进入这么一段

1
2
3
4
public NioServerSocketChannel(ServerSocketChannel channel) {//参数是刚才创建的底层channel
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

NioServerSocketChannelConfig()相关部分

NioServerSocketChannel的构造函数中调用了这么一个:

1
config = new NioServerSocketChannelConfig(this, javaChannel().socket());

方便后期对TCP进行配置.

AbstractNioChannel()相关部分

再看它的父类构造器,追踪到AstractNioChannel的构造函数里面会看到

1
ch.configureBlocking(false);	//这里的ch是刚才创建的jdk底层的channel,把它设为非阻塞

这么一段.把channel设置为非阻塞流

继续追踪AbstractNioChannel的父类,到了AbstractChannel之后,会看到它的构造函数为

1
2
3
4
5
6
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId(); //id
unsafe = newUnsafe(); //底层TCP读写操作的类
pipeline = newChannelPipeline(); //pipeline
}

初始化服务端Channel

再看创建服务端Channel时的过程,他之后将会调用init()方法.

1
2
3
4
5
6
7
8
9
bind()[用户端代码入口]
|
doBind()
|
initAndRegist()[初始化并注册]
|
newChannel()[创建服务端Channel] //通过反射构造channel
|
init()[初始化服务端Channel] //初始化刚才用反射构造的channel

那么这个初始化会做哪些事

1
2
3
4
5
6
7
8
9
init()[初始化服务端Channel]
|
set ChannelOptions,ChannelAttrs //配置用户自定义的服务端配置
|
set childOption,childAttr //配置用户自定义的连接配置,为每个新连接都配置
|
config handler [配置服务端pipeline] //用户代码中通过.handler()设置的部分
|
add ServerBootstrapAcceptor[添加连接器] //添加一个给accept用的NIO的线程

set ChannelOptions,ChannelAttrs

目的是设置用户自定义的服务端配置.

io.netty.bootstrap.ServerBootstrap#init中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();//拿到用户自定义的option
synchronized (options) {
channel.config().setOptions(options); //设置进去
}

final Map<AttributeKey<?>, Object> attrs = attrs0();//拿到用户自定义的attr
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue()); //设置进去
}
}

这里配置的是通过以下的方式配置的,第一节中的代码示例中没有配.

1
2
ServerBootstrap b = new ServerBootstrap();
b.option(/**/).attr(/**/);

set childOption,childAttr

这一步是为了设置用户自定义的连接配置,为每个新连接都配置. 值是用户代码中.childOption().childAttr()配置的部分.

同样在io.netty.bootstrap.ServerBootstrap#init中:

1
2
3
4
5
6
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}

config handler

配置完成后接下来在,在pipeline中把用户自定义的handler放入.这个handler是用户代码中通过.handler()设置的部分.同样是io.netty.bootstrap.ServerBootstrap#init.

1
2
3
4
5
6
7
8
9
10
11
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler(); //获取用户设置的服务端handler
if (handler != null) {
pipeline.addLast(handler); //把服务端handler放到pipeline中
}
...
})

config.handler()是如何获取用户设置的handler的?

首先得知道,客户端代码进行.handler(/**/)的时候,调用的是AbstractBootstrap#handler,此时只会单纯把这个handler存到AbstractBootstrap的一个handler属性中.config.handler()就是返回这个handler属性的值.后面的.childHandler()等配置取值都是通过类似方式.

add ServerBootstrapAcceptor

这一步的目的是给accept添加一个NIO的线程,还是接着看io.netty.bootstrap.ServerBootstrap#init.上一步操作后,他会

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//创建了一个accept线程
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//添加特殊的一个acceptor处理器,把客户端配置都放入
pipeline.addLast(
new ServerBootstrapAcceptor(
currentChildGroup, //用户代码通过.childGroup()配置
currentChildHandler,//用户代码通过.childHandler()配置
currentChildOptions,//上面说过了
currentChildAttrs));
}
});
}
});

注册selector

回到io.netty.bootstrap.AbstractBootstrap#initAndRegister方法.构造channel,channel初始化的两步完成后会调用register方法

1
2
3
4
Channel channel = null;
channel = channelFactory.newChannel();//构造
init(channel); //初始化
ChannelFuture regFuture = config().group().register(channel); //注册,实际调用io.netty.channel.AbstractChannel.AbstractUnsafe#register

步骤:

1
2
3
4
5
6
7
8
9
10
11
12
io.netty.channel.AbstractChannel.AbstractUnsafe#register
AbstractBootstrap.register(channel)[入口]
|
this.eventLoop = eventLoop[绑定线程]
|
register0()[实际注册]
|
doRegister()[调用jdk底层注册] //把jdk channel注册到selector
|
invokeHandlerAddedIfNeeded() //注册事件的回调
|
fireChannelRegistered()[传播事件] //告诉用户代码注册成功

this.eventLoop = eventLoop

实际代码

1
AbstractChannel.this.eventLoop = eventLoop;

之后的所有操作都由这个eventLoop处理,之后调用register0()

register0()

io.netty.channel.AbstractChannel.AbstractUnsafe#register0,做了三件事:

1
2
3
4
5
doRegister();	//把jdk channel注册到selector
...
pipeline.invokeHandlerAddedIfNeeded();//注册事件的回调
...
pipeline.fireChannelRegistered();//告诉用户代码注册成功

doRegister()

位置:io.netty.channel.nio.AbstractNioChannel#doRegister

里边调用JDK底层的一个注册方法,把jdk channel注册到selector

1
2
3
4
5
selectionKey = javaChannel()//返回之前创建服务端channel时创建的底层Channel
.register( //jdk channel的api
eventLoop().selector,//让他注册到这个selector中
0,//注册时需要关心的事件.这里是0说明不需要关心,只是单纯把channel绑定到selector上
this); //attachment

实际上,注册一个channel的时候是调用jdk底层的一个api去注册到selector中
并把这个this,就是服务端的channel,也一同附加进去,
之后selector轮询到jdk channel时通过获取attachment就能取出netty服务端的Channel,从而进行特定的传播.

invokeHandlerAddedIfNeeded和fireChannelRegistered

由于在用户代码中通过.handler(new ServerHandler())进行了绑定.

invokeHandlerAddedIfNeeded和fireChannelRegistered分别调用用户代码中的com.imooc.netty.ch3.ServerHandler#channelRegisteredcom.imooc.netty.ch3.ServerHandler#handlerAdded

但注意,register0()中下面还有个pipeline.fireChannelActive();`:

1
2
3
4
5
6
7
if (isActive()) {	//此时返回false
if (firstRegistration) {
pipeline.fireChannelActive(); //不会被调用
} else if (config().isAutoRead()) {
beginRead();
}
}

但此时由于isActive()返回false,这个fireChannelActive是不会被触发的.

端口绑定

1
2
3
4
5
6
7
8
9
10
AbstractUnsafe.bind()[入口]
|
doBind()//实际绑定到本地
|
javaChannel().bind()[jdk底层绑定]
|
pipline.fireChannelActive()[传播事件] //active
|
HeadContext.readIdIsAutoRead() //把之前注册到selector的事件,重新绑定为accept事件
//之后有新连接进来,他都会轮询到这个accept事件.交给netty处理

回到io.netty.bootstrap.AbstractBootstrap#doBind,然后->bind0->..->io.netty.channel.AbstractChannel.AbstractUnsafe#bind.在里面会看到实际绑定本地端口的doBind()和active事件传播的pipline.fireChannelActive().

doBind()

这里实际调用的是io.netty.channel.socket.nio.NioServerSocketChannel#doBind. 通过

1
2
javaChannel()//之前创建的jdk底层channel 
.bind(localAddress, config.getBacklog());//jdk提供的api

对一个端口进行绑定.

pipline.fireChannelActive()

代码相关部分(AbstractUnsafe#bind):

1
2
3
4
5
6
7
8
9
10
boolean wasActive = isActive();//还没绑定,false 
doDisconnect();//绑定端口
if (!wasActive && isActive()) {//!false&true,绑定前不是active绑定后active.触发事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive(); //触发active事件
}
});
}

fireChannelActive,实际上不只是单纯的触发active事件.追踪pipeline.fireChannelActive()->DefaultChannelPipeline#fireChannelActive->AbstractChannelHandlerContext#invokeChannelActive->io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

这个HeadContext#channelActive的代码如下:

1
2
3
4
5
6
7
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive(); //触发active

readIfIsAutoRead(); //把之前注册到selector的事件,重新绑定为accept事件.
//使之后有新连接进来,他都会轮询到这个accept事件.把它交给netty处理
}

怎样实现重新绑定accept事件?

跟踪这个readIfIsAutoRead()->..->io.netty.channel.AbstractChannelHandlerContext#read->..io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead->..->io.netty.channel.nio.AbstractNioChannel#doBeginRead

doBeginRead的内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;//刚才注册channel到selector时拿到的key
if (!selectionKey.isValid()) {
return;
}

readPending = true;

final int interestOps = selectionKey.interestOps();//因为注册selector时放了0,这里返回0
if ((interestOps & readInterestOp) == 0) {// 0 & ... == 0 ,成立
//意思就是在前面注册的事件的基础上再加入一个事件
//这个readInterestOp就是accept事件
selectionKey.interestOps(interestOps | readInterestOp);
}
}

回顾一下readInterestOp是从哪来的.在NioServerSocketChannel的构造函数中就带入了

1
2
3
4
5
6
7
super(null, channel, SelectionKey.OP_ACCEPT);	//加入accept事件
---
//之后的父类构造器 ..
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp; //!!就是这里添加的readInterestOp,也就是SelectionKey.OP_ACCEPT

readIfIsAutoRead()的逻辑完成后,当收到OP_ACCEPT事件的时候就会交给netty处理.

CATALOG
  1. 1. 一个简单的netty服务端
  2. 2. Netty服务端启动的过程
    1. 2.1. 创建服务端Channel
    2. 2.2. 反射构建服务端Channel
      1. 2.2.1. newSocket()相关部分
      2. 2.2.2. NioServerSocketChannelConfig()相关部分
      3. 2.2.3. AbstractNioChannel()相关部分
    3. 2.3. 初始化服务端Channel
      1. 2.3.1. set ChannelOptions,ChannelAttrs
      2. 2.3.2. set childOption,childAttr
      3. 2.3.3. config handler
      4. 2.3.4. add ServerBootstrapAcceptor
    4. 2.4. 注册selector
      1. 2.4.1. this.eventLoop = eventLoop
      2. 2.4.2. register0()
    5. 2.5. 端口绑定
      1. 2.5.1. doBind()
      2. 2.5.2. pipline.fireChannelActive()