netty如何判断ChannelHandler类型的?
对于ChannelHandler的添加应该遵循什么样的顺序?
用户手动触发事件传播,不同的触发方式有什么样的区别?
pipeline知识点
pipeline初始化
pipeline在创建Channel的时候被创建
pipeline节点数据结构:ChannelHandlerContext
Pipeline中的两大哨兵:head和tail
pipeline在创建Channel的时候被创建 在netty的Channel的创建过程中,无论是客户端还是服务端,最终会调用这个父类构造器
1 2 3 4 5 6 7 8 9 10 protected AbstractChannel (Channel parent) { this .parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } --- protected DefaultChannelPipeline newChannelPipeline () { return new DefaultChannelPipeline(this ); }
看一下DefaultChannelPipeline
的创建过程
1 2 3 4 5 6 7 8 9 10 11 12 protected DefaultChannelPipeline (Channel channel) { this .channel = ObjectUtil.checkNotNull(channel, "channel" ); succeededFuture = new SucceededChannelFuture(channel, null ); voidPromise = new VoidChannelPromise(channel, true ); tail = new TailContext(this ); head = new HeadContext(this ); head.next = tail; tail.prev = head; }
pipeline节点数据结构:ChannelHandlerContext io.netty.channel.ChannelHandlerContext
接口,它定义了pipeline节点的数据结构.pipeline中每一个节点都是这个接口的实现,包括刚才的head和tail.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public interface ChannelHandlerContext extends AttributeMap , //可以存储属性 ChannelInboundInvoker , //用于事件传播(inbound ) ChannelOutboundInvoker { Channel channel () ; EventExecutor executor () ; String name () ; ChannelHandler handler () ; boolean isRemoved () ; @Override ChannelHandlerContext fireChannelRegistered () ; @Override ChannelHandlerContext fireChannelUnregistered () ; @Override ChannelHandlerContext fireChannelActive () ; @Override ChannelHandlerContext fireChannelInactive () ; @Override ChannelHandlerContext fireExceptionCaught (Throwable cause) ; @Override ChannelHandlerContext fireUserEventTriggered (Object evt) ; @Override ChannelHandlerContext fireChannelRead (Object msg) ; @Override ChannelHandlerContext fireChannelReadComplete () ; @Override ChannelHandlerContext fireChannelWritabilityChanged () ; @Override ChannelHandlerContext read () ; @Override ChannelHandlerContext flush () ; ChannelPipeline pipeline () ; ByteBufAllocator alloc () ; @Deprecated @Override <T> Attribute<T> attr (AttributeKey<T> key) ; @Deprecated @Override <T> boolean hasAttr (AttributeKey<T> key) ; }
通常情况下,它尤io.netty.channel.AbstractChannelHandlerContext
实现
Pipeline中的两大哨兵:head和tail 在几章前创建pipeline时同时创建了tail和head
1 2 tail = new TailContext(this ); head = new HeadContext(this );
下面分别看一下它们
1 2 3 4 5 6 final class TailContext extends AbstractChannelHandlerContext //满足pipeline 节点的数据结构 implements ChannelInboundHandler { TailContext(DefaultChannelPipeline pipeline) { super (pipeline, null , TAIL_NAME, true , false ); setAddComplete(); }
跟super构造函数到AbstractChannelHandlerContext
1 2 3 4 5 6 7 8 9 10 AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, boolean inbound, boolean outbound) { this .name = ObjectUtil.checkNotNull(name, "name" ); this .pipeline = pipeline; this .executor = executor; this .inbound = inbound; this .outbound = outbound; ordered = executor == null || executor instanceof OrderedEventExecutor; }
也就是说tail是一个inbound的处理器.
继续看TailContext
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler { @Override public ChannelHandler handler () { return this ; } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { onUnhandledInboundException(cause); } @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }
从上面可以看出,tail主要是做一些收尾的信息,只会对未完善的部分输出logger进行提醒程序员.
看完tail后来看head
1 2 3 4 5 6 7 8 9 10 11 12 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super (pipeline, null , HEAD_NAME, false , true ); unsafe = pipeline.channel().unsafe(); setAddComplete(); }
看一下它有哪些方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler , ChannelInboundHandler { @Override public ChannelHandler handler () { return this ; } @Override public void bind ( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); }
head节点的工作:1.把事件原封不动的进行传播,2.把读写操作委托给unsafe
添加ChannelHandler 回顾用户代码中是如何添加ChannelHandler的?
1 2 3 4 5 6 7 8 9 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel (SocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter()); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter()); } })
这里的addLast()
会做些什么?
判断是否重复添加
创建节点并添加至链表 把这个ChannelHandler包装成ChannelHandlerContext,添加到链表
回调添加完成事件 告诉用户添加完成
判断是否重复添加 进到addLast
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Override public final ChannelPipeline addLast (ChannelHandler... handlers) { return addLast(null , handlers); } --- @Override public final ChannelPipeline addLast (EventExecutorGroup executor, ChannelHandler... handlers) { for (ChannelHandler h: handlers) { addLast(executor, null , h); } return this ; } --- @Override public final ChannelPipeline addLast (EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this ) { checkMultiplicity(handler); }
看一下checkMultiplicity
是如何判断的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private static void checkMultiplicity (ChannelHandler handler) { if (handler instanceof ChannelHandlerAdapter) { ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler; if (!h.isSharable() && h.added) { throw new ChannelPipelineException( h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times." ); } h.added = true ; } }
创建节点并添加至链表 继续看addLast
的下一步
1 2 3 4 5 6 7 8 9 10 11 12 @Override public final ChannelPipeline addLast (EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this ) { checkMultiplicity(handler); newCtx = newContext(group, filterName(name, handler), handler); }
进到这个newContext()
,它会创建ChannelHandlerContext对用户传入的initChannel进行分装
1 2 3 4 5 6 7 private AbstractChannelHandlerContext newContext (EventExecutorGroup group, String name, ChannelHandler handler) { return new DefaultChannelHandlerContext(this , childExecutor(group), name, handler); }
之后,它会调用addLast0()
来添加到链表
1 2 3 4 5 6 name = filterName(name, handler); newCtx = newContext(group, name, handler); addLast0(newCtx);
看一下addLast0
的步骤.实际就是把它插入到双向链表中tail的前面
1 2 3 4 5 6 7 private void addLast0 (AbstractChannelHandlerContext newCtx) { AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; }
回调添加完成事件 继续看addLast
的步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 @Override public final ChannelPipeline addLast (EventExecutorGroup group, String name, ChannelHandler handler) { final AbstractChannelHandlerContext newCtx; synchronized (this ) { EventExecutor executor = newCtx.executor(); if (!executor.inEventLoop()) { newCtx.setAddPending(); executor.execute(new Runnable() { @Override public void run () { callHandlerAdded0(newCtx); } }); return this ; } } callHandlerAdded0(newCtx); return this ; }
也就是说是通过callHandlerAdded0
执行回调的.看一下源码
1 2 3 4 private void callHandlerAdded0 (final AbstractChannelHandlerContext ctx) { ctx.handler().handlerAdded(ctx); ctx.setAddComplete(); }
这里不是调用ChannelInitializer的handlerAdded方法,而是用户代码中自定义Handler的父类ChannelHandlerAdapter的handlerAdded
而ChannelHandlerAdapter#handlerAdded
的源码如下,他什么都不会做
1 2 3 4 @Override public void handlerAdded (ChannelHandlerContext ctx) throws Exception { }