服务负载均衡-Dubbo篇

╰+哭是因爲堅強的太久メ 2022-12-27 02:27 217阅读 0赞

关键词:负载均衡 轮询算法 随机算法


Dubbo源码版本:

  1. <dependency>
  2. <groupId>com.alibaba</groupId>
  3. <artifactId>dubbo</artifactId>
  4. <version>2.6.9</version>
  5. </dependency>

负载均衡分为两种模式:客户端模式和服务端模式。客户端模式是服务消费者(调用方)从注册中心获取可用的服务列表,然后根据负载均衡算法选择一个服务进行调用,Dubbo中的负载均衡就是使用的客户端模式。服务端模式,是服务消费者直接调用一个服务,由该服务进行负载均衡,比如Nginx使用的是服务端模式。

一般在集群模式中,服务都是多个的。服务调用方与服务提供方一般都是多对多的关系。当一个服务调用方需要调用一个功能接口时,提供该功能接口的服务是多个,因此需要从这些服务提供者中选择一个服务进行调用。这样的过程就叫负载均衡。

负载均衡是有策略的,也就是从多个服务提供者中如何确定出一个用于本次的服务调用。这个如何确定的过程就是负载均衡策略,也叫负载均衡算法。一般有轮询、随机等。


█ 服务调用

详细的服务调用说明请戳《服务调用-Dubbo篇》

通过上面的内容获取到了服务接口的代理对象,比如有这样一个接口:CatService:

  1. public interface CatService {
  2. void say();
  3. }

生成的代理对象就是这样的(伪代码):

  1. public class CatServiceProxy implements CatService{
  2. @Override
  3. public void say() {
  4. // 逻辑会被转发到MockClusterInvoker的invoke方法中
  5. // invocation里面封装了接口、调用方法等信息
  6. mockClusterWrapper.invoke(invocation);
  7. }
  8. }

MockClusterWrapper.invoke:

  1. public Result invoke(Invocation invocation) throws RpcException {
  2. Result result = null;
  3. // mock功能,服务调用失败后的容错处理
  4. String value = this.directory.getUrl().getMethodParameter(invocation.getMethodName(), "mock", Boolean.FALSE.toString()).trim();
  5. // 使用了mock功能
  6. if (value.length() != 0 && !value.equalsIgnoreCase("false")) {
  7. if (value.startsWith("force")) {
  8. if (logger.isWarnEnabled()) {
  9. logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + this.directory.getUrl());
  10. }
  11. result = this.doMockInvoke(invocation, (RpcException)null);
  12. } else {
  13. try {
  14. result = this.invoker.invoke(invocation);
  15. } catch (RpcException var5) {
  16. if (var5.isBiz()) {
  17. throw var5;
  18. }
  19. if (logger.isWarnEnabled()) {
  20. logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + this.directory.getUrl(), var5);
  21. }
  22. result = this.doMockInvoke(invocation, var5);
  23. }
  24. }
  25. } else {
  26. // 没有使用mock功能,直接调用this.invoker.invoke
  27. result = this.invoker.invoke(invocation);
  28. }
  29. return result;
  30. }

this.invoker.invoke:

MockClusterWrapper中持有的是FailoverClusterInvoker,所以会调用FailoverClusterInvoker中的invoke方法,FailoverClusterInvoker继承了AbstractClusterInvoker,invoke是AbstractClusterInvoker中的方法:

  1. public Result invoke(Invocation invocation) throws RpcException {
  2. ......
  3. LoadBalance loadbalance = null;
  4. ......
  5. // 获取服务列表
  6. List<Invoker<T>> invokers = this.list(invocation);
  7. // 获取LoadBalance,负载均衡器,可通过配置指定,默认为RandomLoadBalance
  8. if (invokers != null && !invokers.isEmpty()) {
  9. loadbalance = (LoadBalance)ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(((Invoker)invokers.get(0)).getUrl().getMethodParameter(RpcUtils.getMethodName(invocation), "loadbalance", "random"));
  10. }
  11. RpcUtils.attachInvocationIdIfAsync(this.getUrl(), invocation);
  12. return this.doInvoke(invocation, invokers, loadbalance);
  13. }

this.doInvoke:

doInvoke是AbstractClusterInvoker定义的抽象方法,具体要看子类实现,进入FailoverClusterInvoker的doInvoke方法:

  1. public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
  2. ......
  3. // copyinvokers就是invokers,
  4. // invoked是已获取到的invoker列表
  5. // 进入负载均衡选择服务
  6. Invoker<T> invoker = this.select(loadbalance, invocation, copyinvokers, invoked);
  7. invoked.add(invoker);
  8. ......
  9. }

select:

  1. protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
  2. ......
  3. Invoker<T> invoker = this.doSelect(loadbalance, invocation, invokers, selected);
  4. ......
  5. }

doSelect:

  1. private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
  2. ......
  3. // 调用具体的LoadBalance逻辑获取一个服务
  4. Invoker<T> invoker = loadbalance.select(invokers, this.getUrl(), invocation);
  5. ......
  6. }

█ 负载均衡

(1)指定服务调用时的负载均衡算法:

服务调用者和服务提供者都指定时,以服务调用者配置的为准。

  1. <dubbo:service loadbalance="xxx" />
  2. <dubbo:reference loadbalance="xxx" />

(2)LoadBalance:接口,定义了select方法

  1. @SPI("random")
  2. public interface LoadBalance {
  3. @Adaptive({"loadbalance"})
  4. <T> Invoker<T> select(List<Invoker<T>> var1, URL var2, Invocation var3) throws RpcException;
  5. }

AbstractLoadBalance:抽象类,实现LoadBalance,定义了模板方法

  1. public abstract class AbstractLoadBalance implements LoadBalance
  2. public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  3. if (invokers != null && !invokers.isEmpty()) {
  4. // this.doSelect是抽象方法,由具体的子类实现
  5. return invokers.size() == 1 ? (Invoker)invokers.get(0) : this.doSelect(invokers, url, invocation);
  6. } else {
  7. return null;
  8. }
  9. }

AbstractLoadBalance有四个实现类,分别对应四种负载均衡策略:RoundRobinLoadBalance(轮询)、RandomLoadBalance(随机)、ConsistentHashLoadBalance(一致性哈希)、LeastActiveLoadBalance(最小活跃数)。四个实现类实现了AbstractLoadBalance中的抽象方法doSelect。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81MDUxODI3MQ_size_16_color_FFFFFF_t_70

(3)在Dubbo项目的META-INF\dubbo\internal\com.alibaba.dubbo.rpc.cluster.LoadBalance,提供默认扩展实现:

  1. random=com.alibaba.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
  2. roundrobin=com.alibaba.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
  3. leastactive=com.alibaba.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
  4. consistenthash=com.alibaba.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
  • RoundRobinLoadBalance

通过轮询进行负载均衡,比如服务列表中有A、B、C、D四个服务,四个服务按照ABCD顺序排列。如果第一次选中了A,则第二次就会选中B,再下一次选中C,再下一次选中D,然后再回到A。思想是这样的。Dubbo中的轮询负载均衡,还添加了一个权重来调整轮询的策略。看看RoundRobinLoadBalance中的代码实现:

  1. // invokers 从注册中心获取到的所有的接口服务列表
  2. // url 调用服务请求的URL信息
  3. // invocation 调用接口方法信息
  4. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  5. // 获取服务列表中的第一个服务,根据serviceKey与方法名拼接成key
  6. // 如果是对一个接口方法提供服务的话,此处invokers列表中的所有key都一样。
  7. String key = ((Invoker)invokers.get(0)).getUrl().getServiceKey() + "." + invocation.getMethodName();
  8. // 缓存处理,缓存服务方法key与WeightedRoundRobin
  9. // WeightedRoundRobin记录所有方法的权重信息,key是接口方法信息,value是一个map,map记录的是当前接口方法下,
  10. // 每一个服务提供者的权重信息
  11. ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> map = (ConcurrentMap)this.methodWeightMap.get(key);
  12. // 初始化
  13. if (map == null) {
  14. this.methodWeightMap.putIfAbsent(key, new ConcurrentHashMap());
  15. map = (ConcurrentMap)this.methodWeightMap.get(key);
  16. }
  17. // 所有服务的总的权重值
  18. int totalWeight = 0;
  19. long maxCurrent = -9223372036854775808L;
  20. long now = System.currentTimeMillis();
  21. // 最终经过算法被选中的服务提供者
  22. Invoker<T> selectedInvoker = null;
  23. // selectedInvoker对应的WeightedRoundRobin
  24. RoundRobinLoadBalance.WeightedRoundRobin selectedWRR = null;
  25. int weight;
  26. // 遍历服务提供者列表
  27. for(Iterator var13 = invokers.iterator(); var13.hasNext(); totalWeight += weight) {
  28. Invoker<T> invoker = (Invoker)var13.next();
  29. // 每一个服务提供者唯一标识
  30. String identifyString = invoker.getUrl().toIdentityString();
  31. // 获取当前服务提供者的权重
  32. RoundRobinLoadBalance.WeightedRoundRobin weightedRoundRobin = (RoundRobinLoadBalance.WeightedRoundRobin)map.get(identifyString);
  33. // 获取和计算当前服务提供者的权重
  34. weight = this.getWeight(invoker, invocation);
  35. if (weight < 0) {
  36. weight = 0;
  37. }
  38. // 对于第一次提供服务的,这边是null,要进行初始化
  39. if (weightedRoundRobin == null) {
  40. weightedRoundRobin = new RoundRobinLoadBalance.WeightedRoundRobin();
  41. // 将计算得到的权重值记录下来
  42. weightedRoundRobin.setWeight(weight);
  43. // 存放到map中
  44. map.putIfAbsent(identifyString, weightedRoundRobin);
  45. weightedRoundRobin = (RoundRobinLoadBalance.WeightedRoundRobin)map.get(identifyString);
  46. }
  47. // 这里是如果服务不是第一次提供,就不会进入上面的if(weightedRoundRobin == null)
  48. // 那么weightedRoundRobin.getWeight()得到的就是历史缓存的数据,weight是计算得到的最新的数据
  49. // 两种会存在不相等的可能
  50. if (weight != weightedRoundRobin.getWeight()) {
  51. // 更新成最新计算得到的权重
  52. weightedRoundRobin.setWeight(weight);
  53. }
  54. // this.current.addAndGet((long)this.weight);
  55. // 累加当前服务提供者的权重值
  56. long cur = weightedRoundRobin.increaseCurrent();
  57. weightedRoundRobin.setLastUpdate(now);
  58. // 如果累加之后的权重值大于最大的权重值-9223372036854775808L,则进入逻辑
  59. if (cur > maxCurrent) {
  60. // 更新最大权重值
  61. maxCurrent = cur;
  62. selectedInvoker = invoker;
  63. selectedWRR = weightedRoundRobin;
  64. }
  65. }
  66. // 服务提供者列表发生了变化,要刷新以前的缓存数据
  67. if (!this.updateLock.get() && invokers.size() != map.size() && this.updateLock.compareAndSet(false, true)) {
  68. try {
  69. ConcurrentMap<String, RoundRobinLoadBalance.WeightedRoundRobin> newMap = new ConcurrentHashMap();
  70. newMap.putAll(map);
  71. Iterator it = newMap.entrySet().iterator();
  72. while(it.hasNext()) {
  73. Entry<String, RoundRobinLoadBalance.WeightedRoundRobin> item = (Entry)it.next();
  74. if (now - ((RoundRobinLoadBalance.WeightedRoundRobin)item.getValue()).getLastUpdate() > (long)RECYCLE_PERIOD) {
  75. it.remove();
  76. }
  77. }
  78. this.methodWeightMap.put(key, newMap);
  79. } finally {
  80. this.updateLock.set(false);
  81. }
  82. }
  83. if (selectedInvoker != null) {
  84. // this.current.addAndGet((long)(-1 * total));
  85. // 将当前被选中的服务提供者对应的权重值减去总权重值
  86. selectedWRR.sel(totalWeight);
  87. return selectedInvoker;
  88. } else {
  89. return (Invoker)invokers.get(0);
  90. }
  91. }

关键代码是这几个有关计算值的地方:

  1. int totalWeight = 0;
  2. long maxCurrent = -9223372036854775808L;
  3. // totalWeight = totalWeight + weight
  4. for(Iterator var13 = invokers.iterator(); var13.hasNext(); totalWeight += weight) {
  5. weight = this.getWeight(invoker, invocation);
  6. // cur = cur + weight
  7. long cur = weightedRoundRobin.increaseCurrent();
  8. if (cur > maxCurrent) {
  9. // 更新最大权重值
  10. maxCurrent = cur;
  11. selectedInvoker = invoker;
  12. selectedWRR = weightedRoundRobin;
  13. }
  14. }
  15. // cur = cur - totalWeight
  16. selectedWRR.sel(totalWeight);

weightedRoundRobin.increaseCurrent():

  1. // 即cur = cur + weight
  2. public long increaseCurrent() {
  3. return this.current.addAndGet((long)this.weight);
  4. }

selectedWRR.sel(totalWeight);:

  1. // 即cur = cur - totalWeight
  2. // 这里减去的是所有的权重和,其实减去任意一个大于0的数效果一样
  3. public void sel(int total) {
  4. this.current.addAndGet((long)(-1 * total));
  5. }

(1)权重值相同:

服务提供者列表中一共有4个服务提供者,都使用默认权重100,则总权重值是400:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81MDUxODI3MQ_size_16_color_FFFFFF_t_70 1

第一次调用doSelect,进入for循环,map中都没有数据:

invoker=A,weight=100,cur=cur+weigth=0+100=100,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=100,selectedInvoker=A。继续:

invoker=B,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,进入下一个循环:

invoker=C,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,进入下一个循环:

invoker=D,weight=100,cur=cur+weigth=0+100=100,maxCurrent=100。因为cur不大于maxCurrent,循环结束。

循环结束,map中记录ABCD权重信息,totalWeight=400。

最终选中A,将A的cur = cur+(-1)*400 = 100-400 = -300。

第二次调用doSelect,进入for循环,map中都缓存了数据:

invoker = A,weight = 100,cur=cur+weigth=-300+100=-200,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=-200,selectedInvoker=A。进入下一个循环:

invoker = B,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=-200。因为cur>maxCurrent,所以maxCurrent=cur=200,selectedInvoker=B。进入下一个循环:

invoker = C,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=200。因为cur不大于maxCurrent,进入下一个循环:

invoker = D,weight = 100,cur=cur+weigth=100+100=200,maxCurrent=200。因为cur不大于maxCurrent,循环结束。

最终选中B,将B的cur = cur+(-1)*400 = 200-400=-200。

第三次调用doSelect,进入for循环:

invoker = A,weight = 100,cur=cur+weigth=-200+100=-100,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=-100,selectedInvoker=A。进入下一个循环:

invoker = B,weight = 100,cur=cur+weigth=-200+100=-100,maxCurrent=-100。因为cur不大于maxCurrent,进入下一个循环:

invoker = C,weight = 100,cur=cur+weigth=200+100=300,maxCurrent=-100。因为cur>maxCurrent,所以maxCurrent=cur=300,selectedInvoker=C。进入下一个循环:

invoker = D,weight = 100,cur=cur+weigth=200+100=300,maxCurrent=300。因为cur不大于maxCurrent,循环结束。

最终选中C,将C的cur = cur+(-1)*400 = -100。

第四次调用doSelect,进入for循环:

invoker = A,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=-9223372036854775808L。因为cur>maxCurrent,所以maxCurrent=cur=0,selectedInvoker=A。进入下一个循环:

invoker = B,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=0。因为cur不大于maxCurrent,进入下一个循环:

invoker = C,weight = 100,cur=cur+weigth=-100+100=0,maxCurrent=0。因为cur不大于maxCurrent,进入下一个循环:

invoker = D,weight = 100,cur = 400,maxCurrent=0。因为cur>maxCurrent,所以maxCurrent=cur=0,selectedInvoker=D。循环结束。

最终选中D,将D的cur = cur+(-1)*400 = 0。

此时ABCD四个的cur的值又都是一样的了,和初始值都是100一样。ABCD轮询完成一圈,再进行下一圈的轮询。

思想:

初始筹码相同(即权重),依次遍历列表,找到的第一个筹码最多的将被选出。被选出的要减少一定数量的筹码(总权重值),没被选出的就增加相同的一定数量的筹码(各自的权重)。此时因为初始筹码都相同,该增加的筹码也相同,该减少的筹码也是相同的。所以此时每个元素都有相同的机会被轮询到。

为了让增加一定数量的筹码(各自的权重)实际上会产生增加数量,代码中增加了让权重不能为负数的逻辑,因为和一个负数相加,就做了减法了:

  1. if (weight < 0) {
  2. weight = 0;
  3. }

(2)权重值不同:

服务提供者列表中一共有4个服务提供者,权重值不同。总权重值是510。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81MDUxODI3MQ_size_16_color_FFFFFF_t_70 2

思想:

初始筹码不同(即权重),依次遍历列表,找到的第一个筹码最多的将被选出。被选出的要减少一定数量的筹码(总权重值),没被选出的就增加一定数量的筹码(各自的权重)。

此时情况就不一样了,被选中的是减去总权重值,所以被减数都是相同的,但是减数是不一样的,减数越大,最后剩下的就会越多。没被选中的要增加各自的权重,这个权重越大,增加的数量就会越多。所以,权重值越大的,被选中的机会就越大了。

  • RandomLoadBalance

随机的负载均衡算法,即从列表中随机选择一个。Dubbo也添加了一个权重值来调节随机算法:

  1. protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
  2. int length = invokers.size();
  3. int totalWeight = 0;
  4. // 服务提供者的权重值是否全部相同
  5. boolean sameWeight = true;
  6. int offset;
  7. int i;
  8. // 遍历服务提供者列表
  9. for(offset = 0; offset < length; ++offset) {
  10. // 依次计算权重值
  11. i = this.getWeight((Invoker)invokers.get(offset), invocation);
  12. totalWeight += i;
  13. // 从第二个开始,拿当前的服务提供者的权重值与前面一个作比较。如果出现一次不相等的,就不是相同的权重值
  14. if (sameWeight && offset > 0 && i != this.getWeight((Invoker)invokers.get(offset - 1), invocation)) {
  15. sameWeight = false;
  16. }
  17. }
  18. // 总权重值大于0,并且权重值不相同
  19. if (totalWeight > 0 && !sameWeight) {
  20. // 根据总权重值产生一个0到totalWeight范围内的随机数
  21. offset = this.random.nextInt(totalWeight);
  22. // 循环,length是服务提供者列表的长度
  23. for(i = 0; i < length; ++i) {
  24. // offset = offect - weight
  25. offset -= this.getWeight((Invoker)invokers.get(i), invocation);
  26. if (offset < 0) {
  27. return (Invoker)invokers.get(i);
  28. }
  29. }
  30. }
  31. // 总权重值小于等于0,或者权重值都一样,则在列表长度范围内随机产生一个随机数
  32. // 获取下标是随机数的值返回
  33. return (Invoker)invokers.get(this.random.nextInt(length));
  34. }

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl81MDUxODI3MQ_size_16_color_FFFFFF_t_70 3

若随机数是50,用50依次减去100,60,200,150。差小于0则选出对应下标的元素。index=0

若随机数是100,用100依次减去100,60,200,150。差小于0则选出对应下标的元素。index=1

……

思想:

增加权重值来加大某个元素被选中的概率。权重值越大,被选中的概率就越大。如上图所示,得到的权重值集合可以看成一个长方形区域,按照四个元素的权重值来划分长方形的面积,权重值大的得到的面积就大。此时,产生随机数可以看成是向该长方形区域投掷飞镖,当然面积越大的被飞镖落中的概率就大了。

Dubbo官方的描述:

RandomLoadBalance 是加权随机算法的具体实现,它的算法思想很简单。假设我们有一组服务器 servers = [A, B, C],他们对应的权重为 weights = [5, 3, 2], 权重总和为10。现在把这些权重值平铺在一维坐标值上,[0, 5) 区间属于服务器 A,[5, 8) 区间属于服务器 B,[8, 10) 区间属于服务器 C。 接下来通过随机数生成器生成一个范围在 [0, 10) 之间的随机数,然后计算这个随机数会落到哪个区间上。比如数字3会落到服务器 A 对应的区间上,此时返回服务器 A 即可。 权重越大的机器,在坐标轴上对应的区间范围就越大,因此随机数生成器生成的数字就会有更大的概率落到此区间内。 只要随机数生成器产生的随机数分布性很好,在经过多次选择后,每个服务器被选中的次数比例接近其权重比例。比如,经过一万次选择后,服务器 A 被选中的次数大约为5000次, 服务器 B 被选中的次数约为3000次,服务器 C 被选中的次数约为2000次。

剩下的负载均衡介绍,请戳Dubbo官网《http://dubbo.apache.org/zh/docs/v2.7/dev/source/loadbalance/》

发表评论

表情:
评论列表 (有 0 条评论,217人围观)

还没有评论,来说两句吧...

相关阅读

    相关 dubbo负载均衡

    负载均衡算法 在集群负载均衡时,Dubbo提供了多种均衡策略,缺省为random随机调用。 在集群负载均衡时,Dubbo提供了4种均衡策略,如: Random Load

    相关 Dubbo-负载均衡

    切勿将负载均衡策略写死在代码里,将来我们可以用控制台来进行控制。 负载均衡 在集群负载均衡时,Dubbo 提供了多种均衡策略,缺省为 `random` 随机调

    相关 Dubbo-负载均衡

    一、什么是负载均衡 LoadBalance 即负载均衡,它的职责是将网络请求,或者其他形式的负载“均摊”到不同的机器上。避免集群中部分服务器压力过大,而另一些服务器比较空