Dubbo的两个集群接口Cluster,Cluster Invoker
Cluster将多个Invoker合并为一个Cluster Invoker,并将这个 Invoker 暴露给服务消费者。
服务消费者只需通过这个Cluster Invoker 进行远程调用即可,至于具体调用哪个Invoker,以及调用失败后如何处理等问题,现在都交给集群模块去处理。
Dubbo 提供了多种集群实现,包含但不限于 Failover Cluster、Failfast Cluster 和 Failsafe Cluster 等。每种集群实现类的用途不同,接下来会一一进行分析。
集群容错
在对集群相关代码进行分析之前,这里有必要先来介绍一下集群容错的所有组件。包含 Cluster、Cluster Invoker、Directory、Router 和 LoadBalance 等。
分析一下集群工作的流程.分为消费者初始化期间和进行远程调用时
- 服务消费者初始化期间
- 集群 Cluster 实现类为服务消费者创建 Cluster Invoker 实例,即上图中的 merge 操作。
- 服务消费者进行远程调用时.FailoverClusterInvoker 为例
- 调用 Directory 的 list 方法列举 Invoker 列表.
这里的Directory通常是指RegistryDirectory.RegistryDirectory是会动态增删Invoker的,当有新增的Invoker时它会调用Router的route方法进行路由,也就是Invoker的过滤.
- 通过LoadBalance选择一个Invoker实例的invoker方法
- 把参数传给该invoker方法.进行远程调用.
Dubbo 主要提供了这样几种容错方式:
- Failover Cluster - 失败自动切换
- Failfast Cluster - 快速失败
- Failsafe Cluster - 失败安全
- Failback Cluster - 失败自动恢复
- Forking Cluster - 并行调用多个服务提供者
Cluster
cluster接口仅用于生成 Cluster Invoker。所以它只包含下面这一个方法.
1 2 3 4 5 6 7 8 9
| public class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public <T> Invoker<T> join(Directory<T> directory) throws RpcException { return new FailoverClusterInvoker<T>(directory); }
}
|
除了FailoverCluster,其他的cluster也都如此.仅用于生成各自对应的 Cluster Invoker.在构造方法中参数的directory会被保存.
Cluster Invoker
前面说到,服务消费者初始化期间会调用Cluster的join()来创建ClusterInvoker.其实这部分在前几篇讲服务引入的时候有接触过,在RegistryProtocol#doRefer
里面.这里来简单回顾一下方法中相关的部分.
1 2 3 4 5 6 7 8 9 10
| 1. 创建RegistryDirectory实例 2. RegistryDirectory中放入zookeeperRegistry.(顺便创建ZK ConsumerURL的节点) 3. RegistryDirectory中放入dubboProtocol 4. directory#subscribe 1. 从zk获取providers,router等 2. (更新路由) 3. 创建对应providerURL的netty客户端,把连接构成invoker 5. cluster.join(directory); 6. 返回合并后的invoker
|
重点是第5步,clusterInvoker就是在那时被创建的.
这一部分就不过多解释了,下面来看服务消费者进行远程调用的部分.
讲服务目录的时候说过,每当远程服务被Consumer调用时,AbstractDirectory的list()都会被调用.其实Consumer是调用AbstractClusterInvoker 的 invoke (),再由它去调用AbstractDirectory的list()的.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Override public Result invoke(final Invocation invocation) throws RpcException { checkWhetherDestroyed(); LoadBalance loadbalance = null;
Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); }
List<Invoker<T>> invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
|
重点做了两件事1. 从Directory拿到List<Invoker<T>>
,2. 加载LoadBalance.
回顾一下,从Directory拿到List<Invoker<T>>
部分是调用AbstractDirectory.list()获取invoker列表.列表的元素是在RegistryDirectory#notify()的时候放进去的.
继续看后续由各个子类实现的doInvoke()
方法
FailoverClusterInvoker
FailoverClusterInvoker的特性是在调用失败时,会自动切换 Invoker 进行重试.这是dubbo默认的cluster Invoker.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| @Override public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyinvokers = invokers; checkInvokers(copyinvokers, invocation); int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } RpcException le = null; List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size()); Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); checkInvokers(copyinvokers, invocation); }
Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); return result; } catch (RpcException e) { if (e.isBiz()) { throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } }
throw new RpcException(..., "Failed to invoke the method ..."); }
|
首先从url中获取retries(重试次数)参数,默认为3.也就是说当Invoker不好使时可以进行3次尝试.每次通过loadbalance来选择一个invoker.如果invoker调用成功,则返回远程调用的结果.如果3次调用都失败了,那么就抛出异常
看一下通过loadbalance选择一个invoker时调用的select()的逻辑.该方法在父类的AbstractClusterInvoker中定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; String methodName = invocation == null ? "" : invocation.getMethodName();
boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);
if (sticky) { stickyInvoker = invoker; }调用同一个服务提供者,除非该提供者挂了再进行切换 return invoker; }
|
上面可以看到有粘滞连接的功能.当该功能开启是时尽可能得调用同一个服务提供者,除非该提供者挂了再进行切换.而没有开启该功能时(如demo),则是调用doSelect()来进行选择.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; if (invokers.size() == 1) return invokers.get(0); if (loadbalance == null) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); }
Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);
if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { Invoker<T> rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { int index = invokers.indexOf(invoker); try { invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
|
主要包含三个工作,
通过负载均衡组件选择invoker.loadBalancer默认是RandomLoadBalance.
如果这个Invoker是之前选到过并且不好使的invoker,则调用reselect()进行重选.
如果重选的结果为null,直接返回下一个Invoker
重选的reselect()方法的代码就不分析了,简单说一下逻辑,它有几个分支
从未选过的invoker集合中进行选择
从所有Invoker中排除之前选择过得selected集合.通过负载均衡组件从这个未选过的invoker集合中选择
如果所有Invoker都被选择过了,那么从selected集合里面找是否有可用的
如果所有Invoker都被选过了,而且选过的Invoker状态都为不可用,返回null
以上FailoverClusterInvoker就分析结束了
FailbackClusterInvoker
FailbackClusterInvoker 会在调用失败后,返回一个空结果给服务提供者。并通过定时任务对失败的调用进行重传,适合执行消息通知等操作。
简单看这一下关于失败处理的部分代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public class FailbackClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final long RETRY_FAILED_PERIOD = 5 * 1000;
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedInternalThreadFactory("failback-cluster-timer", true));
private final ConcurrentMap<Invocation, AbstractClusterInvoker<?>> failed = new ConcurrentHashMap<Invocation, AbstractClusterInvoker<?>>(); private volatile ScheduledFuture<?> retryFuture;
@Override protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method ...");
addFailed(invocation, this); return new RpcResult(); } }
private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) { if (retryFuture == null) { synchronized (this) { if (retryFuture == null) { retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {
@Override public void run() { try { retryFailed(); } catch (Throwable t) { logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } }在调用失败后,返回一个空结果给服务提供者。
failed.put(invocation, router); }
void retryFailed() { if (failed.size() == 0) { return; }
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) { Invocation invocation = entry.getKey(); Invoker<?> invoker = entry.getValue(); try { invoker.invoke(invocation); failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method ..."); } } } }
|
逻辑很简单.总之首次invoke失败时,无论之后在定时任务中Invoker再次调用成功与否,消费者拿到的都是空结果.而且就算失败也不会抛出异常.
FailfastClusterInvoker
FailfastClusterInvoker 只会进行一次调用,失败后立即抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public class FailfastClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); try { return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { throw (RpcException) e; } throw new RpcException(..., "Failfast invoke providers ..."); } } }
|
FailsafeClusterInvoker
FailsafeClusterInvoker 是一种失败安全的 Cluster Invoker。所谓的失败安全是指,当调用过程中出现异常时,FailsafeClusterInvoker 仅会打印异常,而不会抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class FailsafeClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { try { checkInvokers(invokers, invocation); Invoker<T> invoker = select(loadbalance, invocation, invokers, null); return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); } } }
|
ForkingClusterInvoker
ForkingClusterInvoker 会在运行时通过线程池创建多个线程,并发调用多个服务提供者。只要有一个服务提供者成功返回了结果,doInvoke 方法就会立即结束运行。ForkingClusterInvoker 的应用场景是在一些对实时性要求比较高读操作(注意是读操作,并行写操作可能不安全)下使用,但这将会耗费更多的资源。
这部分也非常简单,由于代码比较长.解析的部分可以参考官方文档
https://dubbo.incubator.apache.org/zh-cn/docs/source_code_guide/cluster.html
BroadcastClusterInvoker
BroadcastClusterInvoker 会逐个调用每个服务提供者,如果其中一台报错,在循环调用结束后,BroadcastClusterInvoker 会抛出异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T> {
@Override public Result doInvoke(final Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { checkInvokers(invokers, invocation); RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; for (Invoker<T> invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { throw exception; } return result; } }
|
最后
关于cluster 和 cluster Invoker的概念.
Consumer持有的Invoker是cluster Invoker.
ClusterInvoker实际上只是多个Invoker的包装类罢了,让消费者认为自己是在调用单个的Invoker.
而在clusterInvoker被调用时,它将会根据负载均衡等一系列操作,从自己持有的Invoker中选出一个Invoker,作为真正进行远程调用的invoker,把结果返回给Consumer.
总结一下各个ClusterInvoker的特性: