解码:把一串二进制数据流解析成ByteBuf
解码器抽象的解码过程
netty里面有哪些拆箱即用的解码器
本章内容
解码器基类 ByteToMessageDecoder解码步骤 所有底层解码器都基于它实现
解码步骤:
累加字节流
调用子类的decode方法进行解析
将解析到的ByteBuf向下传播
1.累加字节流
如果是第一次读数据,把当前读入的数据设置成累加器
否则,把这次读入的数据累加到累加器上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null ; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); ..省略 } else { ctx.fireChannelRead(msg); } }
累加器cumulation
是什么.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private Cumulator cumulator = MERGE_CUMULATOR;public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate (ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 ) { buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } buffer.writeBytes(in); in.release(); return buffer; } };
2.调用子类的decode方法进行解析 放入累加器后会调用子类的decode方法进行解析.
1 2 3 4 5 6 public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { CodecOutputList out = CodecOutputList.newInstance(); ... callDecode(ctx, cumulation, out);
可以把out
简单理解为一个ArrayList.调用callDecode()
进行解析,然后把解析完成后的结果放入
out
.然后再从这个out
的List里面的解析结果一个个拿出,向下进行传播
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 protected void callDecode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { while (in.isReadable()) { int outSize = out.size(); if (outSize > 0 ) { fireChannelRead(ctx, out, outSize); out.clear(); if (ctx.isRemoved()) { break ; } outSize = 0 ; } int oldInputLength = in.readableBytes(); decode(ctx, in, out); if (ctx.isRemoved()) { break ; } if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break ; } else { continue ; } } if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message." ); } if (isSingleDecode()) { break ; } } }
3.将解析到的ByteBuf向下传播 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Override public void channelRead (ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null ; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); .. } finally { .. int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } }
跟踪fireChannelRead()
看一下传播的过程:
1 2 3 4 5 6 7 8 static void fireChannelRead (ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) { for (int i = 0 ; i < numElements; i ++) { ctx.fireChannelRead(msgs.getUnsafe(i) ); } }
传播后业务代码就能获取到解码后的ByteBuf
对象
netty中常见的解码器分析 基于固定长度解码器分析 最简单的解码器io.netty.handler.codec.FixedLengthFrameDecoder
.只有一个成员变量,在构造方法中传入
1 2 3 4 5 6 public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; public FixedLengthFrameDecoder (int frameLength) { this .frameLength = frameLength; }
规则如下:
1 2 3 4 5 6 7 8 如果你传入这样的数据包 * +---+----+------+----+ * | A | BC | DEFG | HI | * +---+----+------+----+ 设置长度=3,那么会封装成一下的形式 * +-----+-----+-----+ * | ABC | DEF | GHI | * +-----+-----+-----+
它的decode()
实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override protected final void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null ) { out.add(decoded); } } --- protected Object decode (ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (in.readableBytes() < frameLength) { return null ; } else { return in.readRetainedSlice(frameLength); } }
当小于帧长度时,会返回null,导致父类的callDecode()
方法会返回,直到下次再进数据时继续进行解析:
1 2 3 4 5 6 7 8 9 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break ; } else { continue ; } }
基于行解码器 对\n
或\r\n
进行分割,io.netty.handler.codec.LineBasedFrameDecoder
1 2 3 4 5 6 7 8 9 10 11 12 public class LineBasedFrameDecoder extends ByteToMessageDecoder { private final int maxLength; private final boolean failFast; private final boolean stripDelimiter; private boolean discarding; private int discardedBytes;
它的decode()
,和上一节的一模一样:
1 2 3 4 5 6 7 8 @Override protected final void decode (ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null ) { out.add(decoded); } }
decode()
具体实现:
通过findEndOfLine
可以找到换行处的索引,\n
或\r\n
:
1 2 3 4 5 6 7 8 private static int findEndOfLine (final ByteBuf buffer) { int i = buffer.forEachByte(ByteProcessor.FIND_LF); if (i > 0 && buffer.getByte(i - 1 ) == '\r' ) { i--; } return i; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 protected Object decode (ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { final int eol = findEndOfLine(buffer); if (!discarding) { if (eol >= 0 ) { final ByteBuf frame; final int length = eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1 ; if (length > maxLength) { buffer.readerIndex(eol + delimLength); fail(ctx, length); return null ; } if (stripDelimiter) { frame = buffer.readRetainedSlice(length); buffer.skipBytes(delimLength); } else { frame = buffer.readRetainedSlice(length + delimLength); } return frame; } else { final int length = buffer.readableBytes(); if (length > maxLength) { discardedBytes = length; buffer.readerIndex(buffer.writerIndex()); discarding = true ; if (failFast) { fail(ctx, "over " + discardedBytes); } } return null ; } } else { if (eol >= 0 ) { final int length = discardedBytes + eol - buffer.readerIndex(); final int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1 ; buffer.readerIndex(eol + delimLength); discardedBytes = 0 ; discarding = false ; if (!failFast) { fail(ctx, length); } } else { discardedBytes += buffer.readableBytes(); buffer.readerIndex(buffer.writerIndex()); } return null ; } }
没有找到换行符,就是下面这种情况:
如果这个readIndex和writeIndex之间的长度大于长度限制的话,会把所有数据进行丢弃,并标记为丢弃模式
基于分隔符解码器分析 io.netty.handler.codec.DelimiterBasedFrameDecoder
它的构造函数:
1 2 3 4 public DelimiterBasedFrameDecoder (int maxFrameLength, ByteBuf... delimiters) { this (maxFrameLength, true , delimiters); }
还是看它的decode()
,拆解成3个步骤:
行处理器 如果带入的分割符是基于行的分隔符,通过行处理器去处理,就是上小节的
找到最小分隔符 因为它支持基于多个分隔符机进行分割
解码
1.行处理器 1 2 3 4 5 6 protected Object decode (ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { if (lineBasedDecoder != null ) { return lineBasedDecoder.decode(ctx, buffer); }
lineBasedDecoder
是在构造方法中被初始化的,之后分隔符为行分隔符是会被初始化
1 2 3 4 5 6 public DelimiterBasedFrameDecoder ( ... if (isLineBased(delimiters) && !isSubclass () ) { lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast); this .delimiters = null ; }
2.找到最小分隔符 1 2 3 4 5 6 7 8 9 10 11 12 13 protected Object decode (ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { int minFrameLength = Integer.MAX_VALUE; ByteBuf minDelim = null ; for (ByteBuf delim: delimiters) { int frameLength = indexOf(buffer, delim); if (frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } }
3.解码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 protected Object decode (ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { if (minDelim != null ) { int minDelimLength = minDelim.capacity(); ByteBuf frame; if (discardingTooLongFrame) { discardingTooLongFrame = false ; buffer.skipBytes(minFrameLength + minDelimLength); int tooLongFrameLength = this .tooLongFrameLength; this .tooLongFrameLength = 0 ; if (!failFast) { fail(tooLongFrameLength); } return null ; } if (minFrameLength > maxFrameLength) { buffer.skipBytes(minFrameLength + minDelimLength); fail(minFrameLength); return null ; } if (stripDelimiter) { frame = buffer.readRetainedSlice(minFrameLength); buffer.skipBytes(minDelimLength); } else { frame = buffer.readRetainedSlice(minFrameLength + minDelimLength); } return frame; } else { if (!discardingTooLongFrame) { if (buffer.readableBytes() > maxFrameLength) { tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); discardingTooLongFrame = true ; if (failFast) { fail(tooLongFrameLength); } } } else { tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null ; } }
基于长度域解码器分析 也就是io.netty.handler.codec.LengthFieldBasedFrameDecoder
重要参数
lengthFieldOffset = 2: 偏移量是多少,图中从开头到00距离2个字节,所以2
lengthFieldLength = 2: 从00出开始几个字节为止为长度,图中是2个字节,所以2
而0004 = 4也就是说这个数据包的结尾是从04开始往后的4个字节.所以最终的出这个数据包的范围如图所示
例子 1 2 3 4 5 6 7 8 9 10 * <b>lengthFieldOffset</b> = <b>0</b>长度域便宜0 * <b>lengthFieldLength</b> = <b>2</b>0,1字节拼起来的数表示了这个数据包的长度 * lengthAdjustment = 0 * initialBytesToStrip = 0 (= do not strip header) * * BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) * +--------+----------------+ +--------+----------------+ * | Length | Actual Content |----->| Length | Actual Content | * | 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" | * +--------+----------------+ +--------+----------------+
0,1字节拼接为 0x000C =12也就是说数据包的结尾是从这里开始12个字节之后,也就是Hello,World
那段.由于lengthAdjustment
和initialBytesToStrip
的值为0,所以最后完成的数据包也是同样的. 那么这两个参数分别代表什么?
lengthAdjustment 有时候算出的Length( 0x000C )可能不代表后面的数据包的长度,所以用这个参数进行调整 比如,当lengthAdjustment= 1
的时候,他将会包括Hello,World
那段加上后面的一个字节
initialBytesToStrip 解析数据包时是否需要跳过字节
例子2 1 2 3 4 5 6 7 8 9 10 * lengthFieldOffset = 0 * lengthFieldLength = 2 * lengthAdjustment = 0 同样算出0x000C * <b>initialBytesToStrip = 2</b> (= the length of the Length field) * * BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes) * +--------+----------------+ +----------------+ * | Length | Actual Content |----->| Actual Content | * | 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" | * +--------+----------------+ +----------------+
initialBytesToStrip = 2表示跳过开头的两个字节,因此得出的数据包不包含开头表明长度的那2个字节
例子3 1 2 3 4 5 6 7 8 9 10 * lengthFieldOffset = 0 * lengthFieldLength = 2 * lengthAdjustment = -2</b> (= the length of the Length field) * initialBytesToStrip = 0 * * BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) * +--------+----------------+ +--------+----------------+ * | Length | Actual Content |----->| Length | Actual Content | * | 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" | * +--------+----------------+ +--------+----------------+
这里的Length是4个字节( 0x000E ),这里把lengthAdjustment= -2,让它只算12个字节
例子4 1 2 3 4 5 6 7 8 9 10 * <b>lengthFieldOffset = 2</b> (= the length of Header 1) * <b>lengthFieldLength = 3</b> * lengthAdjustment = 0 * initialBytesToStrip = 0 * * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) * +----------+----------+----------------+ +----------+----------+----------------+ * | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content | * | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" | * +----------+----------+----------------+ +----------+----------+----------------+
从2字节处开始的3个字节表示长度,因此的出长度为从这里开始的12(0x00000C)个字节
例子5 1 2 3 4 5 6 7 8 9 10 * lengthFieldOffset = 0 * lengthFieldLength = 3 * <b>lengthAdjustment = 2</b> (= the length of Header 1) * initialBytesToStrip = 0 * * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) * +----------+----------+----------------+ +----------+----------+----------------+ * | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content | * | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" | * +----------+----------+----------------+ +----------+----------+----------------+
如果不让写lengthAdjustment = 2,那么的出的实际数据会是0xCAFEHELLO, WOR
例子6 1 2 3 4 5 6 7 8 9 10 * lengthFieldOffset = 1 (= the length of HDR1) * lengthFieldLength = 2 * <b>lengthAdjustment = 1</b> (= the length of HDR2) * <b>initialBytesToStrip = 3</b> (= the length of HDR1 + LEN) * * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) * +------+--------+------+----------------+ +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+ +------+----------------+
从1字节处开始的2个字节表示长度,因此的出长度为从这里开始的12(0x00000C)个字节,此时只会截到HELLO, WORL
,通过lengthAdjustment扩大1个字节,再通过initialBytesToStrip跳过开头的不必要的两段
例子7 1 2 3 4 5 6 7 8 9 10 * lengthFieldOffset = 1 * lengthFieldLength = 2 * <b>lengthAdjustment = -3</b> (= the length of HDR1 + LEN, negative) * <b>initialBytesToStrip = 3</b> * * BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) * +------+--------+------+----------------+ +------+----------------+ * | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | * | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | * +------+--------+------+----------------+ +------+----------------+
得到Length = 16(0x0010),显然太长了,lengthAdjustment = -3
调整长度,再设置initialBytesToStrip跳过开头的不必要的两段
1 2 3 4 5 6 7 8 9 10 11 12 13 public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder { private final ByteOrder byteOrder; private final int maxFrameLength; private final int lengthFieldOffset; private final int lengthFieldLength; private final int lengthFieldEndOffset; private final int lengthAdjustment; private final int initialBytesToStrip; private final boolean failFast; private boolean discardingTooLongFrame; private long tooLongFrameLength; private long bytesToDiscard;
基于长度域解码器步骤
计算需要抽取的数据包长度
跳过字节逻辑处理
丢弃模式下的处理 如果当前的长度大于长度限制,把接下来的字节流丢弃.丢弃到长度域里面指定的字节为止
1.计算需要抽取的数据包长度 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 protected Object decode (ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (discardingTooLongFrame) { long bytesToDiscard = this .bytesToDiscard; int localBytesToDiscard = (int ) Math.min(bytesToDiscard, in.readableBytes()); in.skipBytes(localBytesToDiscard); bytesToDiscard -= localBytesToDiscard; this .bytesToDiscard = bytesToDiscard; failIfNecessary(false ); } if (in.readableBytes() < lengthFieldEndOffset) { return null ; } int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset; long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < 0 ) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameExcept ion ( "negative pre-adjustment length field: " + frameLength) ; } frameLength += lengthAdjustment + lengthFieldEndOffset; if (frameLength < lengthFieldEndOffset) { in.skipBytes(lengthFieldEndOffset); throw new CorruptedFrameException( "Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + lengthFieldEndOffset); } if (frameLength > maxFrameLength) { long discard = frameLength - in.readableBytes(); tooLongFrameLength = frameLength; if (discard < 0 ) { in.skipBytes((int ) frameLength); } else { discardingTooLongFrame = true ; bytesToDiscard = discard; in.skipBytes(in.readableBytes()); } failIfNecessary(true ); return null ; } int frameLengthInt = (int ) frameLength; if (in.readableBytes() < frameLengthInt) { return null ; } if (initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException( "Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip); } in.skipBytes(initialBytesToStrip); int readerIndex = in.readerIndex(); int actualFrameLength = frameLengthInt - initialBytesToStrip; ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); in.readerIndex(readerIndex + actualFrameLength); return frame; }
2.跳过字节逻辑处理 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected Object decode (ChannelHandlerContext ctx, ByteBuf in) throws Exception { ..省略上一步 if (initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException( "Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + initialBytesToStrip); } in.skipBytes(initialBytesToStrip); int readerIndex = in.readerIndex(); int actualFrameLength = frameLengthInt - initialBytesToStrip; ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength); in.readerIndex(readerIndex + actualFrameLength); return frame; }
3.丢弃模式下的处理 想抽取的长度>maxFrameLength
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 protected Object decode (ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (discardingTooLongFrame) { long bytesToDiscard = this .bytesToDiscard; int localBytesToDiscard = (int ) Math.min(bytesToDiscard, in.readableBytes()); in.skipBytes(localBytesToDiscard); bytesToDiscard -= localBytesToDiscard; this .bytesToDiscard = bytesToDiscard; failIfNecessary(false ); } ... if (frameLength > maxFrameLength) { long discard = frameLength - in.readableBytes(); tooLongFrameLength = frameLength; if (discard < 0 ) { in.skipBytes((int ) frameLength); } else { discardingTooLongFrame = true ; bytesToDiscard = discard; in.skipBytes(in.readableBytes()); } failIfNecessary(true ); return null ; } ... }
总结
解码器抽象的解码过程
累加字节流
调用子类的decode方法进行解析
将解析到的ByteBuf向下传播
netty里面有哪些拆箱即用的解码器
基于固定长度解码器
基于行解码器
基于分隔符解码器
基于长度域解码器