农场主的黑科技.

Netty源码剖析-pipeline创建

字数统计: 2k阅读时长: 12 min
2019/01/06 Share
  • netty如何判断ChannelHandler类型的?
  • 对于ChannelHandler的添加应该遵循什么样的顺序?
  • 用户手动触发事件传播,不同的触发方式有什么样的区别?

pipeline知识点

  • pipeline的初始化

    服务端channel或客户端channel在什么时候初始化pipeline?初始化时会做什么

  • 添加删除ChannelHandler

  • 事件和异常的传播
    读写事件如何在pipeline中进行传播

pipeline初始化

  • pipeline在创建Channel的时候被创建
  • pipeline节点数据结构:ChannelHandlerContext
  • Pipeline中的两大哨兵:head和tail

pipeline在创建Channel的时候被创建

在netty的Channel的创建过程中,无论是客户端还是服务端,最终会调用这个父类构造器

1
2
3
4
5
6
7
8
9
10
    protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline(); //创建pipeline,也就是说一个channel对应一个pipeline
}
---
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this); //this也就是当前的nettyChannel
}

看一下DefaultChannelPipeline的创建过程

1
2
3
4
5
6
7
8
9
10
11
12
  protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");//把当前的netty的Channel保存
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);

//创建这两个节点
tail = new TailContext(this);
head = new HeadContext(this);
//把这两个节点变成双向链表的结构
head.next = tail;
tail.prev = head;
}

pipeline节点数据结构:ChannelHandlerContext

io.netty.channel.ChannelHandlerContext接口,它定义了pipeline节点的数据结构.pipeline中每一个节点都是这个接口的实现,包括刚才的head和tail.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public interface ChannelHandlerContext extends 
AttributeMap, //可以存储属性
ChannelInboundInvoker, //用于事件传播(inbound)
ChannelOutboundInvoker {//用于事件传播(outbound)
//当前节点属于那个channel
Channel channel();
//哪一个nioEventLoop会执行这个节点
EventExecutor executor();
//节点处理器名称
String name();
//节点处理器
ChannelHandler handler();
//该节点是否被移除
boolean isRemoved();

//事件传播相关,inbound,outbound

@Override
ChannelHandlerContext fireChannelRegistered();

@Override
ChannelHandlerContext fireChannelUnregistered();

@Override
ChannelHandlerContext fireChannelActive();

@Override
ChannelHandlerContext fireChannelInactive();

@Override
ChannelHandlerContext fireExceptionCaught(Throwable cause);

@Override
ChannelHandlerContext fireUserEventTriggered(Object evt);

@Override
ChannelHandlerContext fireChannelRead(Object msg);

@Override
ChannelHandlerContext fireChannelReadComplete();

@Override
ChannelHandlerContext fireChannelWritabilityChanged();

@Override
ChannelHandlerContext read();

@Override
ChannelHandlerContext flush();

//当前节点的pipeline是哪一个
ChannelPipeline pipeline();
//需要分配内存时(bytebuf),使用哪一个内存分配器
ByteBufAllocator alloc();
//属性存储
@Deprecated
@Override
<T> Attribute<T> attr(AttributeKey<T> key);

@Deprecated
@Override
<T> boolean hasAttr(AttributeKey<T> key);
}

通常情况下,它尤io.netty.channel.AbstractChannelHandlerContext实现

Pipeline中的两大哨兵:head和tail

在几章前创建pipeline时同时创建了tail和head

1
2
tail = new TailContext(this);
head = new HeadContext(this);

下面分别看一下它们

1
2
3
4
5
6
final class TailContext extends AbstractChannelHandlerContext //满足pipeline节点的数据结构
implements ChannelInboundHandler { //传播inbound事件
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete(); //设置为已添加该节点
}

跟super构造函数到AbstractChannelHandlerContext

1
2
3
4
5
6
7
8
9
10
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;//pipeline
this.executor = executor;//nio处理器,这里传入的时null
this.inbound = inbound;//true
this.outbound = outbound;//false

ordered = executor == null || executor instanceof OrderedEventExecutor;
}

也就是说tail是一个inbound的处理器.

继续看TailContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
//..

@Override
public ChannelHandler handler() {
return this; //自己不仅是节点,也包含了处理逻辑
}
//之后的channelRegistered,channelActive等方法都为空
//..
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
onUnhandledInboundException(cause);//对异常的处理,仅是通过logger输出warn
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);//传入的inboundMessage没有被处理调用它,仅是通过loggr输出debug
}

从上面可以看出,tail主要是做一些收尾的信息,只会对未完善的部分输出logger进行提醒程序员.

看完tail后来看head

1
2
3
4
5
6
7
8
9
10
11
12
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler,
ChannelInboundHandler {

private final Unsafe unsafe;

HeadContext(DefaultChannelPipeline pipeline) {
//最后的参数为true,也就是说是处理outbound的节点
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe(); //保存channel的unsafe
setAddComplete(); //设置为已添加该节点
}

看一下它有哪些方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
//...
@Override
public ChannelHandler handler() {
return this;//和tail相同,自身包含了处理逻辑
}

//...
//后面的bind,connect,disconnect等都是和读写相关的操作,如下面.都会委托unsafe进行操作
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);//委托unsafe进行读写操作
}

//再后面的一些方法,如下面.它都会把这个事件继续往下传播
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();//把这个事件继续往下传播

readIfIsAutoRead(); //首次连接时会触发channelActive,通过这行去注册一个读事件
//触发对应nioeventloop的selector的读事件
}

head节点的工作:1.把事件原封不动的进行传播,2.把读写操作委托给unsafe

添加ChannelHandler

回顾用户代码中是如何添加ChannelHandler的?

1
2
3
4
5
6
7
8
9
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//调用这个addLast添加的ChannelHandler
//通常用户代码会继承这两个ChannelHandler,创建自定义的ChannelHandler
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter());
}
})

这里的addLast()会做些什么?

  • 判断是否重复添加
  • 创建节点并添加至链表
    把这个ChannelHandler包装成ChannelHandlerContext,添加到链表
  • 回调添加完成事件
    告诉用户添加完成

判断是否重复添加

进到addLast

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
    @Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {//支持多个handler
return addLast(null, handlers);
}
---
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {

for (ChannelHandler h: handlers) {
//executor 为 null
addLast(executor, null, h); //对每个handler调用这个
}
return this;
}
---
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.判断是否重复添加
checkMultiplicity(handler);
//...
}

看一下checkMultiplicity是如何判断的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {//是否是ChannelHandlerAdapter的实例
//是
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
//isSharable()实际上就是去查看自定义ChannelHandler类上是否有Sharable注解
//有这个Sharable注解注解时表示它是可以多次被添加的.
if (!h.isSharable() && h.added) {//是否是非共享的并且是添加过的

//抛异常
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true; //设置为已添加
}
}

创建节点并添加至链表

继续看addLast的下一步

1
2
3
4
5
6
7
8
9
10
11
12
 @Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.判断是否重复添加
checkMultiplicity(handler);
//2.创建节点
//group是null
//filterName():判断你传入的name是否有重复,用pipeline开始遍历.如果有重复的则抛异常
//handler是用户代码传入的自定义ChannelHandler
newCtx = newContext(group, filterName(name, handler), handler);
}

进到这个newContext(),它会创建ChannelHandlerContext对用户传入的initChannel进行分装

1
2
3
4
5
6
7
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
//创建ChannelHandlerContext
return new DefaultChannelHandlerContext(this, //当前pipeLine
childExecutor(group), //null
name, //名称
handler);//用户代码传入的自定义ChannelHandler
}

之后,它会调用addLast0()来添加到链表

1
2
3
4
5
6
//1.判断是否重复添加
name = filterName(name, handler);
//2.创建节点
newCtx = newContext(group, name, handler);
//3.把新节点添加至链表
addLast0(newCtx);

看一下addLast0的步骤.实际就是把它插入到双向链表中tail的前面

1
2
3
4
5
6
7
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}

回调添加完成事件

继续看addLast的步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//..
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
//如果不在当前线程.把它放入任务队列.就是之前的mpscQueue
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//如果时当前线程,就直接执行
callHandlerAdded0(newCtx);
return this;
}

也就是说是通过callHandlerAdded0执行回调的.看一下源码

1
2
3
4
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.handler().handlerAdded(ctx);//ChannelHandlerAdapter的handlerAdded
ctx.setAddComplete(); //把状态设置为已添加完毕,ADD_COMPLETE
}

这里不是调用ChannelInitializer的handlerAdded方法,而是用户代码中自定义Handler的父类ChannelHandlerAdapter的handlerAdded

ChannelHandlerAdapter#handlerAdded的源码如下,他什么都不会做

1
2
3
4
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// NOOP
}
CATALOG
  1. 1. pipeline知识点
  2. 2. pipeline初始化
    1. 2.1. pipeline在创建Channel的时候被创建
    2. 2.2. pipeline节点数据结构:ChannelHandlerContext
    3. 2.3. Pipeline中的两大哨兵:head和tail
  3. 3. 添加ChannelHandler
    1. 3.1. 判断是否重复添加
    2. 3.2. 创建节点并添加至链表
    3. 3.3. 回调添加完成事件