远程调用的过程:
首先服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。
服务调用方式 Dubbo 支持同步和异步两种调用方式,其中异步调用还可细分为“有返回值”的异步调用和“无返回值”的异步调用。所谓“无返回值”异步调用是指服务消费方只管调用,但不关心调用结果,此时 Dubbo 会直接返回一个空的 RpcResult。若要使用异步特性,需要服务消费方手动进行配置。默认情况下,Dubbo 使用同步调用方式。
在介绍Consumer服务引入章节的最后部分中说过,dubbo最后会通过JavassistProxyFactory为invoker生成代理类,然后进行保存.当时为DemoService生成的代理类如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class proxy0 implements ClassGenerator .DC , EchoService , DemoService { public static Method[] methods; private InvocationHandler handler; public proxy0 (InvocationHandler invocationHandler) { this .handler = invocationHandler; } public proxy0 () { } public String sayHello (String string) { Object[] arrobject = new Object[]{string}; Object object = this .handler.invoke(this , methods[0 ], arrobject); return (String)object; } public Object $echo(Object object) { Object[] arrobject = new Object[]{object}; Object object2 = this .handler.invoke(this , methods[1 ], arrobject); return object2; } }
就是调用InvocationHandler接口的invoke方法,并返回结果.
这里的InvokerHandler的实现类是什么?跟踪JavassistProxyFactory
创建代理的过程就能看到,它代理方法放入的InvokerHandler为InvokerInvocationHandler类型,而参数的invoker是MockClusterInvoker类型(待会讲)
JavassistProxyFactory构造代理类的过程:
1 2 3 4 5 6 @Override public <T> T getProxy (Invoker<T> invoker, Class<?>[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }
先看一下这个InvocationHandler的实现类InvokerInvocationHandler的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public class InvokerInvocationHandler implements InvocationHandler { private final Invoker<?> invoker; public InvokerInvocationHandler (Invoker<?> handler) { this .invoker = handler; } @Override public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString" .equals(methodName) && parameterTypes.length == 0 ) { return invoker.toString(); } if ("hashCode" .equals(methodName) && parameterTypes.length == 0 ) { return invoker.hashCode(); } if ("equals" .equals(methodName) && parameterTypes.length == 1 ) { return invoker.equals(args[0 ]); } return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }
其实就是进行一系列检查后,调用invoker的invoke方法.
之前的章节讲过这里的invoke是聚合了多个服务提供端invoke的clusterInvoker.但经过代码跟踪就会发现,此时拿到的invoke并不是FailoverClusterInvoker等clusterInvoker的实现类,而是一个叫MockClusterInvoker的类型.
MockClusterInvoker包装了directory和clusterInvoker,另外MockClusterInvoker 内部封装了服务降级逻辑.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class MockClusterInvoker <T > implements Invoker <T > { private final Invoker<T> invoker; public Result invoke (Invocation invocation) throws RpcException { Result result = null ; String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false" )) { result = this .invoker.invoke(invocation); } else if (value.startsWith("force" )) { result = doMockInvoke(invocation, null ); } else { try { result = this .invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { result = doMockInvoke(invocation, e); } } } return result; } }
简单解释一下,url中没有mock配置时单纯调用Invoker的invoke(),如果mock配置为”force”时直接进行服务降级,如果不是”force”,那么则在Invoker的invoke()调用失败后进行服务降级.服务降级逻辑doMockInvoke,在这里不分析.
无视这些降级操作,就是proxy会调用FailoverClusterInvoker的invoke方法.上一章说过了FailoverClusterInvoker通过负载均衡操作选择一个服务的invoker,此时选出的Invoker会是DubboInvoker类型的.
看到这里可能不记得这个DubboInvoker从哪来的,复习一下.Consumer服务引入时的中途过程如下:
1 2 3 4 5 6 7 8 9 2. RegistryProtocol.doRefer() | 4. directory.subscribe(subscribeURL) //订阅服务 | 1. 从注册中心获取providerURL 2. DubboProtocol#refer 创建与provider连接的netty客户端.<= 这里创建DubboInvoker 3. 返回invoker | 5. cluster.join(directory);//如果该注册中心下有多个服务提供者,合并成一个invoker
也就是说在调用directry.subscribe()的时候会根据providerURL和subscribeURL构建netty客户端,并把这个客户端封装成DubboInvoker类型的invoker.具体参考之前讲Consumer服务引入那一章.
总而言之我们将通过FailoverClusterInvoker选择出了DubboInvoker并有它的父类AbstractInvoke调用它的doInvoke(),它才是真正进行远程调用的invoker.下面开始分析DubboInvoker:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 public class DubboInvoker <T > extends AbstractInvoker <T > { private final ExchangeClient[] clients; protected Result doInvoke (final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1 ) { currentClient = clients[0 ]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false ); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null ); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else { RpcContext.getContext().setFuture(null ); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(..., "Invoke remote method timeout...." ); } catch (RemotingException e) { throw new RpcException(..., "Failed to invoke remote method: ..." ); } } }
概括一下就是
异步无返回值:上下文的future字段为null,返回空结果
异步有返回值:上下文的future字段为实际的future,返回空结果
同步调用:调用future的get进行阻塞,直到返回结果.
获取ResponseFuture时用到的currentClient为ExchangeClient类型,也就是netty客户端的封装.ResponseFuture 是一个接口,下面我们来看一下它的默认实现类 DefaultFuture 的源码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 public class DefaultFuture implements ResponseFuture { private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>(); private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>(); private final long id; private final Channel channel; private final Request request; private final int timeout; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private volatile Response response; public DefaultFuture (Channel channel, Request request, int timeout) { this .channel = channel; this .request = request; this .id = request.getId(); this .timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); FUTURES.put(id, this ); CHANNELS.put(id, channel); } @Override public Object get () throws RemotingException { return get(timeout); } @Override public Object get (int timeout) throws RemotingException { if (timeout <= 0 ) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break ; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0 , channel, getTimeoutMessage(false )); } } return returnFromResponse(); } @Override public boolean isDone () { return response != null ; } private Object returnFromResponse () throws RemotingException { Response res = response; if (res == null ) { throw new IllegalStateException("response cannot be null" ); } if (res.getStatus() == Response.OK) { return res.getResult(); } if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); } }
也就是说当get()被调用时一直阻塞到response变量为非空为止.那么response变量是在什么时候被放入呢?是在DefaultFuture#received被调用的时候,在设置response变量的同时,把id从map中移除.那么什么时候DefaultFuture#received会被调用呢,是在HeaderExchangeHandler#handleResponse的时候被调用.这一部分稍后会进行分析.
异步模式中放入上下文的FutureAdapter 是什么呢? FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFuture 与 JDK 中的 Future 进行适配。这样当用户线程调用 Future 的 get 方法时,经过 FutureAdapter 适配,最终会调用 ResponseFuture 实现类对象的 get 方法,也就是 DefaultFuture 的 get 方法。
服务消费方发送请求 发送请求
发送请求时的调用链如上
1 2 DubboInvoker#doInvoke -> ReferenceCountExchangeClient#request -> HeaderExchangeClient#request -> HeaderExchangeChannel#request -> AbstractPeer#send -> AbstractClient#send -> NettyChannel#send
下面开始是逐步调用request方法的过程.request方法的参数为public ResponseFuture request(Object request)
看到这里的时候可能已经不记得这个Object是什么了.这里的Object是 封装了method 和 args 的 RpcInvocation .在InvokerInvocationHandler#invoke中被放入的.
下面开始逐步分析.
ReferenceCountExchangeClient ReferenceCountExchangeClient的request就是简单调用HeaderExchangeClient#request.
ReferenceCountExchangeClient 内部定义了一个引用计数变量 referenceCount,每当该对象被引用一次 referenceCount 都会进行自增。每当 close 方法被调用时,referenceCount 进行自减。ReferenceCountExchangeClient 内部仅实现了一个引用计数的功能,其他方法并无复杂逻辑,均是直接调用被装饰对象也就是HeaderExchangeClient的相关方法。
接下来是HeaderExchangeClient.它与ReferenceCountExchangeClient相似,也是简单调用下一层的HeaderExchangeChannel#request.与ReferenceCountExchangeClient的引用计数相比,它的功能是加入心跳检测的逻辑,心跳检测会在构造方法中被开启,close()被调用后停止:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class HeaderExchangeClient implements ExchangeClient { private static final ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2 , new NamedThreadFactory("dubbo-remoting-client-heartbeat" , true )); private final Client client; private final ExchangeChannel channel; private ScheduledFuture<?> heartbeatTimer; private int heartbeat; private int heartbeatTimeout; public HeaderExchangeClient (Client client, boolean needHeartbeat) { if (client == null ) { throw new IllegalArgumentException("client == null" ); } this .client = client; this .channel = new HeaderExchangeChannel(client); String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY); this .heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0." ) ? Constants.DEFAULT_HEARTBEAT : 0 ); this .heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3 ); if (heartbeatTimeout < heartbeat * 2 ) { throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2" ); } if (needHeartbeat) { startHeartbeatTimer(); } } @Override public void close () { doClose(); channel.close(); } private void doClose () { stopHeartbeatTimer(); } private void startHeartbeatTimer () { stopHeartbeatTimer(); if (heartbeat > 0 ) { heartbeatTimer = scheduled.scheduleWithFixedDelay( new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Override public Collection<Channel> getChannels () { return Collections.<Channel>singletonList(HeaderExchangeClient.this ); } }, heartbeat, heartbeatTimeout), heartbeat, heartbeat, TimeUnit.MILLISECONDS); } } private void stopHeartbeatTimer () { if (heartbeatTimer != null && !heartbeatTimer.isCancelled()) { try { heartbeatTimer.cancel(true ); scheduled.purge(); } catch (Throwable e) { if (logger.isWarnEnabled()) { logger.warn(e.getMessage(), e); } } } heartbeatTimer = null ; } }
接下来看它调用的下层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 final class HeaderExchangeChannel implements ExchangeChannel { private final Channel channel; HeaderExchangeChannel(Channel channel) { if (channel == null ) { throw new IllegalArgumentException("channel == null" ); } this .channel = channel; } @Override public ResponseFuture request (Object request) throws RemotingException { return request(request, channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)); } @Override public ResponseFuture request (Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(..., "Failed to send request ...); } // 创建 Request 对象 Request req = new Request(); req.setVersion(Version.getProtocolVersion()); // 设置双向通信标志为 true req.setTwoWay(true); // 这里的 request 变量类型为 RpcInvocation req.setData(request); // 创建 DefaultFuture 对象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { // 调用 NettyClient 的 send 方法发送请求 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } // 返回 DefaultFuture 对象 return future; } }
在HeaderExchangeChannel中RpcInvoker与一系列信息一同被封装到了Request对象中.下面终于能调用NettyClient的send方法发送Request对象了.
需要说明的是,NettyClient 中并未实现 send 方法,该方法继承自父类 AbstractPeer,下面直接分析 AbstractPeer 的代码。
AbstractPeer,AbstractClient 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public abstract class AbstractPeer implements Endpoint , ChannelHandler { @Override public void send (Object message) throws RemotingException { send(message, url.getParameter(Constants.SENT_KEY, false )); } } public abstract class AbstractClient extends AbstractEndpoint implements Client { @Override public void send (Object message, boolean sent) throws RemotingException { if (send_reconnect && !isConnected()) { connect(); } Channel channel = getChannel(); if (channel == null || !channel.isConnected()) { throw new RemotingException(this , "message can not send ..." ); } channel.send(message, sent); } protected abstract Channel getChannel () ; }
简单看一下AbstractClient(NettyClient)调用getChannel获取NettyChannel的过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 public class NettyClient extends AbstractClient { private volatile Channel channel; @Override protected com.alibaba.dubbo.remoting.Channel getChannel () { Channel c = channel; if (c == null || !c.isConnected()) return null ; return NettyChannel.getOrAddChannel(c, getUrl(), this ); } } final class NettyChannel extends AbstractChannel { private static final ConcurrentMap<org.jboss.netty.channel.Channel, NettyChannel> channelMap = new ConcurrentHashMap<org.jboss.netty.channel.Channel, NettyChannel>(); private final org.jboss.netty.channel.Channel channel; private NettyChannel (org.jboss.netty.channel.Channel channel, URL url, ChannelHandler handler) { super (url, handler); if (channel == null ) { throw new IllegalArgumentException("netty channel == null;" ); } this .channel = channel; } static NettyChannel getOrAddChannel (org.jboss.netty.channel.Channel ch, URL url, ChannelHandler handler) { if (ch == null ) { return null ; } NettyChannel ret = channelMap.get(ch); if (ret == null ) { NettyChannel nc = new NettyChannel(ch, url, handler); if (ch.isConnected()) { ret = channelMap.putIfAbsent(ch, nc); } if (ret == null ) { ret = nc; } } return ret; } }
获取到 NettyChannel 实例后,即可进行后续的调用。下面看一下 NettyChannel 的 send 方法。
NettyChannel 提醒一下参数的Object message
是什么.是放了RPCInvoker和一系列通信信息的Request 对象.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public void send (Object message, boolean sent) throws RemotingException { super .send(message, sent); boolean success = true ; int timeout = 0 ; try { ChannelFuture future = channel.write(message); if (sent) { timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); success = future.await(timeout); } Throwable cause = future.getCause(); if (cause != null ) { throw cause; } } catch (Throwable e) { throw new RemotingException(this , "Failed to send message ..." ); } if (!success) { throw new RemotingException(this , "Failed to send message ..." ); } }
经历多次调用,到这里请求数据的发送过程就结束了,过程漫长。为了便于大家阅读代码,这里以 DemoService 为例,将 sayHello 方法的整个调用路径贴出来。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 proxy0#sayHello(String) —> InvokerInvocationHandler#invoke(Object, Method, Object[]) //进行检查,把method 和 args 封装到 RpcInvocation 中传给下面 —> MockClusterInvoker#invoke(Invocation) //有必要的话进行降权操作 --- 本章没有解释的部分 .通过负载均衡在clusterInvoker中选出一个invoker.并调用它的invoke方法 —> AbstractClusterInvoker#invoke(Invocation) —> FailoverClusterInvoker#doInvoke(Invocation, List<Invoker<T>>, LoadBalance) —> Filter#invoke(Invoker, Invocation) // 包含多个 Filter 调用 —> ListenerInvokerWrapper#invoke(Invocation) --- —> AbstractInvoker#invoke(Invocation) —> DubboInvoker#doInvoke(Invocation)//调用ExchangeClient的request().根据用户的设置(异步还是同步)进行不同的后续工作 —> ReferenceCountExchangeClient#request(Object, int)//加入引用计数机制,调用下层 —> HeaderExchangeClient#request(Object, int)//加入心跳机制,调用下层 —> HeaderExchangeChannel#request(Object, int)//把RPCInvoker封装为Request对象,创建future.调用nettyClient的send()发送Request —> AbstractPeer#send(Object)//nettyClient的父类.直接传递给AbstractClient —> AbstractClient#send(Object, boolean)//获取nettyChannel类型对象,调用它的send方法. —> NettyChannel#send(Object, boolean)//参数为Request对象.调用Channel的write写出Request对象 —> NioClientSocketChannel#write(Object)
请求编码 在分析请求编码逻辑之前,我们先来看一下 Dubbo 数据包结构。
Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。
偏移量(Bit)
字段
取值
0 ~ 7
魔数高位
0xda00
8 ~ 15
魔数低位
0xbb
16
数据包类型
0 - Response, 1 - Request
17
调用方式
仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18
事件标识
0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23
序列化器编号
2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization
24 ~ 31
状态
20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ……
32 ~ 95
请求编号
共8字节,运行时生成
96 ~ 127
消息体长度
运行时计算
了解了 Dubbo 数据包格式,接下来我们就可以探索编码过程了。在进行编码类的分析之前,先来回顾一下编码类是在什么时候被创建,并放入pipeline的.
对netty了解的话都知道,netty在启动前都需要在pipeline中添加编码器的和解码器.Dubbo的netty服务器,客户端的创建分别在NettyServer和NettyClient类中,因为添加编码器的过程两者类似,这里只介绍客户端的
NettyClient的启动方法doOpen()的代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Override protected void doOpen () throws Throwable { bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline () { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this ); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder" , adapter.getDecoder()); pipeline.addLast("encoder" , adapter.getEncoder()); pipeline.addLast("handler" , nettyHandler); return pipeline; } }); }
getCodec()在其父类AbstractEndpoint的代码中.codec是在它初始化时根据url中的codec参数,通过SPI创建对应的codec.示例的sayHello的url的codec是”dubbo”.所以加入pipeline的codec的类型为DubboCodec .
我们先从它的父类ExchangeCodec 开始分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 public class ExchangeCodec extends TelnetCodec { protected static final int HEADER_LENGTH = 16 ; protected static final short MAGIC = (short ) 0xdabb ; protected static final byte MAGIC_HIGH = Bytes.short2bytes(MAGIC)[0 ]; protected static final byte MAGIC_LOW = Bytes.short2bytes(MAGIC)[1 ]; protected static final byte FLAG_REQUEST = (byte ) 0x80 ; protected static final byte FLAG_TWOWAY = (byte ) 0x40 ; protected static final byte FLAG_EVENT = (byte ) 0x20 ; protected static final int SERIALIZATION_MASK = 0x1f ; private static final Logger logger = LoggerFactory.getLogger(ExchangeCodec.class); public Short getMagicCode () { return MAGIC; } @Override public void encode (Channel channel, ChannelBuffer buffer, Object msg) throws IOException { if (msg instanceof Request) { encodeRequest(channel, buffer, (Request) msg); } else if (msg instanceof Response) { encodeResponse(channel, buffer, (Response) msg); } else { super .encode(channel, buffer, msg); } } protected void encodeRequest (Channel channel, ChannelBuffer buffer, Request req) throws IOException { Serialization serialization = getSerialization(channel); byte [] header = new byte [HEADER_LENGTH]; Bytes.short2bytes(MAGIC, header); header[2 ] = (byte ) (FLAG_REQUEST | serialization.getContentTypeId()); if (req.isTwoWay()) header[2 ] |= FLAG_TWOWAY; if (req.isEvent()) header[2 ] |= FLAG_EVENT; Bytes.long2bytes(req.getId(), header, 4 ); int savedWriteIndex = buffer.writerIndex(); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH); ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer); ObjectOutput out = serialization.serialize(channel.getUrl(), bos); if (req.isEvent()) { encodeEventData(channel, out, req.getData()); } else { encodeRequestData(channel, out, req.getData(), req.getVersion()); } out.flushBuffer(); if (out instanceof Cleanable) { ((Cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenBytes(); checkPayload(channel, len); Bytes.int2bytes(len, header, 12 ); buffer.writerIndex(savedWriteIndex); buffer.writeBytes(header); buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len); } }
上面方法中我们调用encodeRequestData()对Request的data,也就是RpcInvoker进行了序列化.下面看一下它的序列化过程,这一部分是在DubboCodec中实现.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public class DubboCodec extends ExchangeCodec implements Codec2 { protected void encodeRequestData (Channel channel, ObjectOutput out, Object data, String version) throws IOException { RpcInvocation inv = (RpcInvocation) data; out.writeUTF(version); out.writeUTF(inv.getAttachment(Constants.PATH_KEY)); out.writeUTF(inv.getAttachment(Constants.VERSION_KEY)); out.writeUTF(inv.getMethodName()); out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes())); Object[] args = inv.getArguments(); if (args != null ) for (int i = 0 ; i < args.length; i++) { out.writeObject(encodeInvocationArgument(channel, inv, i)); } out.writeObject(inv.getAttachments()); } }
非常简单,就是依次把方法名和方法参数进行序列化,并放入out里面.
至此,关于服务消费方发送请求的过程就分析完了,接下来我们来看一下服务提供方是如何接收请求的。
服务提供方接收请求 请求解码 当netty服务端(提供者)收到来自客户端(消费者)的请求时通过pipeline,调用DubboCodec的decode()方法.逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 public class ExchangeCodec extends TelnetCodec { @Override public Object decode (Channel channel, ChannelBuffer buffer) throws IOException { int readable = buffer.readableBytes(); byte [] header = new byte [Math.min(readable, HEADER_LENGTH)]; buffer.readBytes(header); return decode(channel, buffer, readable, header); } @Override protected Object decode (Channel channel, ChannelBuffer buffer, int readable, byte [] header) throws IOException { if (readable > 0 && header[0 ] != MAGIC_HIGH || readable > 1 && header[1 ] != MAGIC_LOW) { int length = header.length; if (header.length < readable) { header = Bytes.copyOf(header, readable); buffer.readBytes(header, length, readable - length); } for (int i = 1 ; i < header.length - 1 ; i++) { if (header[i] == MAGIC_HIGH && header[i + 1 ] == MAGIC_LOW) { buffer.readerIndex(buffer.readerIndex() - header.length + i); header = Bytes.copyOf(header, i); break ; } } return super .decode(channel, buffer, readable, header); } if (readable < HEADER_LENGTH) { return DecodeResult.NEED_MORE_INPUT; } int len = Bytes.bytes2int(header, 12 ); checkPayload(channel, len); int tt = len + HEADER_LENGTH; if (readable < tt) { return DecodeResult.NEED_MORE_INPUT; } ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len); try { return decodeBody(channel, is, header); } finally { if (is.available() > 0 ) { try { StreamUtils.skipUnusedStream(is); } catch (IOException e) { logger.warn(e.getMessage(), e); } } } } }
上面方法通过检测消息头中的魔数是否与规定的魔数相等,提前拦截掉非常规数据包,比如通过 telnet 命令行发出的数据包。运行时 DubboCodec 中的 decodeBody 方法会被调用。此时将会进行复原Request对象的操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody (Channel channel, InputStream is, byte [] header) throws IOException { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); long id = Bytes.bytes2long(header, 4 ); if ((flag & FLAG_REQUEST) == 0 ) { } else { Request req = new Request(id); req.setVersion(Version.getProtocolVersion()); req.setTwoWay((flag & FLAG_TWOWAY) != 0 ); if ((flag & FLAG_EVENT) != 0 ) { req.setEvent(Request.HEARTBEAT_EVENT); } try { Object data; if (req.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (req.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcInvocation inv; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { inv = new DecodeableRpcInvocation(channel, req, is, proto); inv.decode(); } else { inv = new DecodeableRpcInvocation(channel, req, new UnsafeByteArrayInputStream(readMessageData(is)), proto); } data = inv; } req.setData(data); } catch (Throwable t) { req.setBroken(true ); req.setData(t); } return req; } } }
我们知道Request的Data部分是RpcInvoker.而在上面代码中,默认情况下(如sayHello示例)通过DecodeableRpcInvocation 的 decode 方法直接在这里对RpcInvoker进行解码,构建RpcInvoker对象.并放入到Request.
下面我们来看一下 DecodeableRpcInvocation 的 decode 方法逻辑。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 public class DecodeableRpcInvocation extends RpcInvocation implements Codec , Decodeable { @Override public Object decode (Channel channel, InputStream input) throws IOException { ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType) .deserialize(channel.getUrl(), input); String dubboVersion = in.readUTF(); request.setVersion(dubboVersion); setAttachment(Constants.DUBBO_VERSION_KEY, dubboVersion); setAttachment(Constants.PATH_KEY, in.readUTF()); setAttachment(Constants.VERSION_KEY, in.readUTF()); setMethodName(in.readUTF()); try { Object[] args; Class<?>[] pts; String desc = in.readUTF(); if (desc.length() == 0 ) { pts = DubboCodec.EMPTY_CLASS_ARRAY; args = DubboCodec.EMPTY_OBJECT_ARRAY; } else { pts = ReflectUtils.desc2classArray(desc); args = new Object[pts.length]; for (int i = 0 ; i < args.length; i++) { try { args[i] = in.readObject(pts[i]); } catch (Exception e) { if (log.isWarnEnabled()) { log.warn("Decode argument failed: " + e.getMessage(), e); } } } } setParameterTypes(pts); Map<String, String> map = (Map<String, String>) in.readObject(Map.class); if (map != null && map.size() > 0 ) { Map<String, String> attachment = getAttachments(); if (attachment == null ) { attachment = new HashMap<String, String>(); } attachment.putAll(map); setAttachments(attachment); } for (int i = 0 ; i < args.length; i++) { args[i] = decodeInvocationArgument(channel, this , pts, i, args[i]); } setArguments(args); } catch (ClassNotFoundException e) { throw new IOException(StringUtils.toString("Read invocation data failed." , e)); } finally { if (in instanceof Cleanable) { ((Cleanable) in).cleanup(); } } return this ; } }
如上,上面的方法通过反序列化将诸如 path、version、调用方法名、参数列表等信息依次解析出来,并设置到相应的字段中.
到这里,请求数据解码的过程就分析完了。此时我们得到了一个 Request 对象,这个对象会被传送到下一个入站处理器中,我们继续往下看。
服务调用 在创建netty服务端时,在pipeline的末尾加入了nettyHandler (请参考NettyServer#doOpen ).所以,解码器将数据包解析成 Request 对象后,NettyHandler 的 messageReceived 方法紧接着会收到这个对象,并将这个对象继续向下传递。最后由 AllChannelHandler 将该对象封装到 Runnable 实现类对象中,并将 Runnable 放入线程池中执行后续的调用逻辑。
1 2 3 4 5 6 NettyHandler#messageReceived(ChannelHandlerContext, MessageEvent) —> AbstractPeer#received(Channel, Object) —> MultiMessageHandler#received(Channel, Object) —> HeartbeatHandler#received(Channel, Object) —> AllChannelHandler#received(Channel, Object) —> ExecutorService#execute(Runnable) // 由线程池执行后续的调用逻辑
NettyHandler 这里我们直接分析调用栈中的分析第一个和最后一个调用方法逻辑。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Sharable public class NettyHandler extends SimpleChannelHandler { private final Map<String, Channel> channels = new ConcurrentHashMap<String, Channel>(); private final URL url; private final ChannelHandler handler; public NettyHandler (URL url, ChannelHandler handler) { if (url == null ) { throw new IllegalArgumentException("url == null" ); } if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } this .url = url; this .handler = handler; } public void messageReceived (ChannelHandlerContext ctx, MessageEvent e) throws Exception { NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler); try { handler.received(channel, e.getMessage()); } finally { NettyChannel.removeChannelIfDisconnected(ctx.getChannel()); } } }
下面再来看看 AllChannelHandler 的逻辑,在详细分析代码之前,我们先来了解一下 Dubbo 中的线程派发模型。
线程派发模型 Dubbo 将底层通信框架中接收请求的线程称为 IO 线程。如果一些事件处理逻辑可以很快执行完,比如只在内存打一个标记,此时直接在 IO 线程上执行该段逻辑即可。但如果事件的处理逻辑比较耗时,比如该段逻辑会发起数据库查询或者 HTTP 请求。此时我们就不应该让事件处理逻辑在 IO 线程上执行,而是应该派发到线程池中去执行。原因也很简单,IO 线程主要用于接收请求,如果 IO 线程被占满,将导致它不能接收新的请求。
以上就是线程派发的背景,下面我们再来通过 Dubbo 调用图,看一下线程派发器所处的位置。
如上图,红框中的 Dispatcher 就是线程派发器。需要说明的是,Dispatcher 真实的职责创建具有线程派发能力的 ChannelHandler,比如 AllChannelHandler、MessageOnlyChannelHandler 和 ExecutionChannelHandler 等,其本身并不具备线程派发能力。Dubbo 支持 5 种不同的线程派发策略,下面通过一个表格列举一下。
策略
用途
all
所有消息都派发到线程池,包括请求,响应,连接事件,断开事件等
direct
所有消息都不派发到线程池,全部在 IO 线程上直接执行
message
只有请求 和响应 消息派发到线程池,其它消息均在 IO 线程上执行
execution
只有请求 消息派发到线程池,不含响应。其它消息均在 IO 线程上执行
connection
在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池
默认配置下,Dubbo 使用 all
派发策略,即将所有的消息都派发到线程池中。
AllChannelHandler 下面我们来分析一下 AllChannelHandler 的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 public class AllChannelHandler extends WrappedChannelHandler { public AllChannelHandler (ChannelHandler handler, URL url) { super (handler, url); } @Override public void connected (Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException(..., " error when process connected event ." , t); } } @Override public void disconnected (Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException(..., "error when process disconnected event ." , t); } } @Override public void received (Channel channel, Object message) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if (message instanceof Request && t instanceof RejectedExecutionException){ Request request = (Request)message; if (request.isTwoWay()){ String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage(); Response response = new Response(request.getId(), request.getVersion()); response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR); response.setErrorMessage(msg); channel.send(response); return ; } } throw new ExecutionException(..., " error when process received event ." , t); } } @Override public void caught (Channel channel, Throwable exception) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception)); } catch (Throwable t) { throw new ExecutionException(..., "error when process caught event ..." ); } } }
如上,请求对象会被封装 ChannelEventRunnable 中,ChannelEventRunnable 将会是服务调用过程的新起点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 public class ChannelEventRunnable implements Runnable { private final ChannelHandler handler; private final Channel channel; private final ChannelState state; private final Throwable exception; private final Object message; @Override public void run () { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("... operation error, channel is ... message is ..." ); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("... operation error, channel is ..." ); } break ; case DISCONNECTED: case SENT: case CAUGHT: default : logger.warn("unknown state: " + state + ", message is " + message); } } } }
ChannelEventRunnable 仅是一个中转站,它的 run 方法中并不包含具体的调用逻辑,仅用于将参数传给其他 ChannelHandler 对象进行处理,该对象类型为 DecodeHandler 。
DecodeHandler DecodeHandler 存在的意义就是保证请求或响应对象可在线程池中被解码。解码完毕后,完全解码后的 Request 对象会继续向后传递,下一站是 HeaderExchangeHandler 。
HeaderExchangeHandler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 public class HeaderExchangeHandler implements ChannelHandlerDelegate { private final ExchangeHandler handler; public HeaderExchangeHandler (ExchangeHandler handler) { if (handler == null ) { throw new IllegalArgumentException("handler == null" ); } this .handler = handler; } @Override public void received (Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { Request request = (Request) message; if (request.isEvent()) { handlerEvent(channel, request); } else { if (request.isTwoWay()) { Response response = handleRequest(exchangeChannel, request); channel.send(response); } else { handler.received(exchangeChannel, request.getData()); } } } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } Response handleRequest (ExchangeChannel channel, Request req) throws RemotingException { Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null ) msg = null ; else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data); else msg = data.toString(); res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); return res; } Object msg = req.getData(); try { Object result = handler.reply(channel, msg); res.setStatus(Response.OK); res.setResult(result); } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); } return res; } }
到这里,我们看到了比较清晰的请求和响应逻辑。到这里,我们看到了比较清晰的请求和响应逻辑。在这一部分中我们拿出了Request中的实际数据RPCInvoker取出进行调用,再把调用结果封装成了Response返回给消费者.
这里是通过handler.reply()
进行的调用,那么这个handler是什么呢? 这个handler是在netty服务器被创建时,也就是DubboProtocol的createServer()里被放入的,它是一个匿名内部类. 来看一下当时在DubboProtocol是如何定义它的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public class DubboProtocol extends AbstractProtocol { public static final String NAME = "dubbo" ; private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public Object reply (ExchangeChannel channel, Object message) throws RemotingException { if (message instanceof Invocation) { Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { } RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress()); return invoker.invoke(inv); } throw new RemotingException(channel, "Unsupported request: ..." ); } } Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { int port = channel.getLocalAddress().getPort(); String path = inv.getAttachments().get(Constants.PATH_KEY); String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY)); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null ) throw new RemotingException(channel, "Not found exported service ..." ); return exporter.getInvoker(); } }
解释一下就是,reply方法会调用DubboProtocol#getInvoker,根据消费端的Channel和RPCInvoker信息构建serviceKey.用serviceKey从提供者启动时构建的exporterMap中获取合适的exporter.拿到exporter的Invoke方法.
该invoke方法是真正的本地方法的invoke.给这个invoker传入包含了参数列表的RPCInvoker.进行真正的本地invoke.
这里拿到的本地Invoker是AbstractProxyInvoker类型
AbstractProxyInvoker 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public abstract class AbstractProxyInvoker <T > implements Invoker <T > { @Override public Result invoke (Invocation invocation) throws RpcException { try { return new RpcResult(doInvoke(proxy, invocation.getMethodName(), invocation.getParameterTypes(), invocation.getArguments())); } catch (InvocationTargetException e) { return new RpcResult(e.getTargetException()); } catch (Throwable e) { throw new RpcException("Failed to invoke remote proxy method ..." ); } } protected abstract Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable ; }
这里的doInvoke是抽象方法.Invoker 实例是在运行时通过 JavassistProxyFactory 创建的,当时的创建逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public class JavassistProxyFactory extends AbstractProxyFactory { @Override public <T> Invoker<T> getInvoker (T proxy, Class<T> type, URL url) { final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$' ) < 0 ? proxy.getClass() : type); return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke (T proxy, String methodName, Class<?>[] parameterTypes, Object[] arguments) throws Throwable { return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; } }
Wrapper 是一个抽象类,其中 invokeMethod 是一个抽象方法。Dubbo 会在运行时通过 Javassist 框架为 Wrapper 生成实现类,并实现 invokeMethod 方法,该方法最终会根据调用信息调用具体的服务。以 DemoServiceImpl 为例,Javassist 为其生成的代理类如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 public class Wrapper0 extends Wrapper implements ClassGenerator .DC { public static String[] pns; public static Map pts; public static String[] mns; public static String[] dmns; public static Class[] mts0; public Object invokeMethod (Object object, String string, Class[] arrclass, Object[] arrobject) throws InvocationTargetException { DemoService demoService; try { demoService = (DemoService)object; } catch (Throwable throwable) { throw new IllegalArgumentException(throwable); } try { if ("sayHello" .equals(string) && arrclass.length == 1 ) { return demoService.sayHello((String)arrobject[0 ]); } } catch (Throwable throwable) { throw new InvocationTargetException(throwable); } throw new NoSuchMethodException(new StringBuffer().append("Not found method \"" ).append(string).append("\" in class com.alibaba.dubbo.demo.DemoService." ).toString()); } }
到这里,整个服务调用过程就分析完了。最后把调用过程贴出来,如下:
1 2 3 4 5 6 7 8 9 10 --- 在线程池中并发运行以下 --- ChannelEventRunnable#run() //根据state调用下层handler的合适方法.示例中state = RECEIVED —> DecodeHandler#received(Channel, Object) //确保此时Request中的RPCInvoker已经解码完成 —> HeaderExchangeHandler#received(Channel, Object) //把Request传给下层,并把结果的Response发送 —> HeaderExchangeHandler#handleRequest(ExchangeChannel, Request)//取出Request中的RPCInvoker传给下层的reply(),把结果封装成Response对象进行返回 —> DubboProtocol.requestHandler#reply(ExchangeChannel, Object)//根据Request中的信息拿到提供者启动时保存的exporter,调用该exporter中的Invoker的invoke() —> Filter#invoke(Invoker, Invocation) —> AbstractProxyInvoker#invoke(Invocation)//调用运行时通过Wrapper生成的Wrapper的方法,把结果放入RPCResult —> Wrapper0#invokeMethod(Object, String, Class[], Object[])//运行时生成的代理,调用真正的sayHello() —> DemoServiceImpl#sayHello(String)
服务消费方接收调用结果 当消费方接受到提供方返回的结果时,又将进入到pipeline中的解码器,解码完成后进入NettyHandler的messageReceived。接下来 NettyHandler 会将这个对象继续向下传递,最后 AllChannelHandler 的 received 方法会收到这个对象,并将这个对象派发到线程池中。这个过程和服务提供方接收请求的过程是一样的,因此这里就不重复分析了。本节我们重点分析两个方面的内容,一是响应数据的解码过程,二是 Dubbo 如何将调用结果传递给用户线程的。下面先来分析响应数据的解码过程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class DubboCodec extends ExchangeCodec implements Codec2 { @Override protected Object decodeBody (Channel channel, InputStream is, byte [] header) throws IOException { byte flag = header[2 ], proto = (byte ) (flag & SERIALIZATION_MASK); Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto); long id = Bytes.bytes2long(header, 4 ); if ((flag & FLAG_REQUEST) == 0 ) { Response res = new Response(id); if ((flag & FLAG_EVENT) != 0 ) { res.setEvent(Response.HEARTBEAT_EVENT); } byte status = header[3 ]; res.setStatus(status); if (status == Response.OK) { try { Object data; if (res.isHeartbeat()) { data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is)); } else if (res.isEvent()) { data = decodeEventData(channel, deserialize(s, channel.getUrl(), is)); } else { DecodeableRpcResult result; if (channel.getUrl().getParameter( Constants.DECODE_IN_IO_THREAD_KEY, Constants.DEFAULT_DECODE_IN_IO_THREAD)) { result = new DecodeableRpcResult(channel, res, is, (Invocation) getRequestData(id), proto); result.decode(); } else { result = new DecodeableRpcResult(channel, res, new UnsafeByteArrayInputStream(readMessageData(is)), (Invocation) getRequestData(id), proto); } data = result; } res.setResult(data); } catch (Throwable t) { res.setStatus(Response.CLIENT_ERROR); res.setErrorMessage(StringUtils.toString(t)); } } else { res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF()); } return res; } else { } } }
步骤流程和提供端收到Request的时候几乎完全一样。之前我们会把消息本体解码成RPCInvoker,而这里是解码成了RPCResult。因为RPCInvoker给的是方法参数类型和列表,RPCResult给的是返回值类型和值。所以decode方法是有些不同的,不过除此外大致相同,这里就不详细看了。
向用户线程传递调用结果 响应数据解码完成后,Dubbo 会将响应对象派发到线程池上。要注意的是,线程池中的线程并非用户的调用线程,所以要想办法将响应对象从线程池线程传递到用户线程上。我们在 2.1 节分析过用户线程在发送完请求后的动作,即调用 DefaultFuture 的 get 方法等待响应对象的到来。当响应对象到来后,用户线程会被唤醒,并通过调用编号 获取属于自己的响应对象。
这部分相关的代码在多线程处理链的HeaderExchangeHandler里面。看一下相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public class HeaderExchangeHandler implements ChannelHandlerDelegate { @Override public void received (Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel); try { if (message instanceof Request) { } else if (message instanceof Response) { handleResponse(channel, (Response) message); } else if (message instanceof String) { } else { handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); } } static void handleResponse (Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } } }
我们注意到最终会调用DefaultFuture.received(channel, response)
。也就是说在这个方法里,会找到进行远程调用中的线程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private volatile Response response; public static void received (Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null ) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at ..." ); } } finally { CHANNELS.remove(response.getId()); } } private void doReceived (Response res) { lock.lock(); try { response = res; if (done != null ) { done.signal(); } } finally { lock.unlock(); } if (callback != null ) { invokeCallback(callback); } } }
FUTURES是<id,DefaultFuture>
的map。里面放了目前没有拿到结果的DefaultFuture。我们通过id拿到那个DefaultFuture,并把它进行唤醒。因为此时response为非空,此时get()就会返回。
最后的最后 读到这里相信我们对Dubbo的机制有了一定的了解。网上有许多关于Dubbo的源码阅读的文章,其实我的这篇文章的90%其实都是从官方的源码导读里面复制过来的(官方源码导读 )。官方的源码导读写的非常好,我最早写这篇文章的目的是因为在Provider和Consumer启动的过程中出现了太多的URL参数,导致在阅读过程中非常困惑,因此我在讲述Provider和Consumer启动的过程花了大量的篇幅去阐述URL的内容。在系列的后半段是讲集群,负载均衡等机制。此时源码追踪已经变得不重要,因此我只是对官方的文档做了简单的补充。
文章中指接触了采用Dubbo协议和Zookeeper注册中心的方案,Dubbo还支持非常多的通信方式和注册中心,希望自己以后对其他部分能深入理解,对Dubbo等RPC框架有自己的看法。