深入理解 YarnFairSchedule 中的饥饿抢占

阳光穿透心脏的1/2处 2022-12-22 15:23 98阅读 0赞

文章目录

    • 饥饿
      • 饥饿的两种类型
        • FairShare 饥饿
        • MinShare 饥饿
    • 抢占
      • 要抢占的Container
    • Code

饥饿

由于 FairSchedule 具有弹性功能,因此对列中运行的应用程序可能使得其他应用程序(在同一队列或不同队列)处于饥饿状态;

在如下例子中,假设仅 tenant1 和 tenant2 队列处于active状态,分别使用了33.3%和66.6%的资源。随后tenant3也变为active状态,则
队列的 Instantaneous FairShare 将分别变为 25%、50%、25%。但此时,tenant3 队列中的应用程序必须等待 tenant1 或 tenant2 中的应用程序释放资源。在此之前,tenant3 将以未满足资源需求或饥饿的形式等待container。

4e61120a4855d72923df811a38bfacee.png

通过抢占,可以以可预测的方式调整这种不平衡,它允许从超过其FairShare的队列中回收资源,而不用等待队列释放资源。

饥饿的两种类型

  • FairShare 饥饿
  • MinShare 饥饿

FairShare 饥饿

当满足以下所有条件时,一个作业可能将遭受 FairShare 饥饿

  1. 该作业有未满足的资源需求
  2. 该作业资源使用量低于其 Instantaneous Fair Share
  3. 该作业的资源使用量低于其 Instantaneous Fair Share 的 公平共享抢占阈值(FairShare Preemption Threshold) 持续时间超过公平共享超时(FairShare Preemption Timeout).

公平共享抢占阈值(FairShare Preemption Threshold) 默认0.5,公平共享超时(FairShare Preemption Timeout)默认情况下未设置;需要明确设置它,以使应用程序被视为饥饿。这两个参数可以在全局或队列级别设置。

例如:假设队列的 FairShare 抢占超时为5秒,FairShare 请战阈值为0.8,如果启用了抢占并且满足上述条件的1和2,当作业在5秒内未获得其Instantaneous FairShare 的80%,FairScheduler将认为该作业处于饥饿状态。

注意:开启抢占并不能保证一个作业或队列能获得其完整的Instantaneous FairShare,只保证一个作业或队列能够获得足够的资源,不再别视为饥饿。此外,此保证还取决于Resource Manager找到要抢占的容器

对于高优先级队列,可以通过将抢占的阈值设置为较高的值,抢占的超时时间设置较小的时间,并将其标记为不可抢占,来积极配置饥饿。

MinShare 饥饿

当一个队列满足以下所以条件时,将产生 MinShare 饥饿

  1. 队列中的一个或多个应用程序有未满足的资源需求
  2. 队列的资源使用量低于其MinShare
  3. 队列的资源使用量低于其 MinShare 的时间超过 MinShare 抢占超时时间

默认情况下未设置MinShare抢占超时时间,需要明确的设置它,以使队列被认为是饥饿的。该参数可以全局设置或在队列级别设置。

MinShare 只能指定在队列级别,不像FairShare,因为作业不会产生因无法满足MinShare而产生饥饿

当队列因饥饿而获取资源后,按需求对队列中的应用程序进行排序。因为即使把队列获得的所有资源都分配给队列中的作业,也可能导致某些作业还是处于饥饿的状态。

例如:假设配置的队列的最小内存,并且该内存比其最小内存低6G,

抢占

抢占的目的:

抢占的弊端:会降低集群的执行效率,因为抢占终止的container会被重新执行

同时满足以下两个条件时,将启用抢占功能:

  1. yarn.scheduler.fair.preemption = true。默认为false.
  2. 整个集群的资源利用率超过指定的阀值,利用率=已使用资源/集群总资源(yarn.scheduler.fair.preemption.cluster-utilization-threshold),默认0.8f

集群利用率是抢占的先决条件,在利用率低的集群上抢占容器会引起不必要的容器搅动,从而影响性能。集群利用率由 内存或 vCore 利用率的较大者表示。

注意:如果有必要,可以将队列标记为不可抢占,例如,因为它是最高优先级队列,因此永远不能抢占队列中的资源。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQWd9l1q-1605867049112)(https://i.loli.net/2020/11/18/GWwZ132Jx9jBsAd.png)\]

要抢占的Container

通过抢占一个或多个节点上运行的container,可以满足应用程序的资源需求。当多个节点上的容器可以满足此资源请求时,会优先考虑AM最少的节点,此外,只有满足以下两个条件时,才能抢占容器:

  • 容器的应用程序被置为可抢占
  • 杀死容器不会使作业资源低于FairShare。(换句话说,抢占后最终使用的资源少于其FairShare的作业不能被抢占)

在决定要抢占哪个容器时不使用MinShare 。MinShare仅用于配置饥饿。这是非常重要的,因为不管是否配置了FairShare或MinShare饥饿,都只能抢占超过FairShare的作业,如下表所示:

Screen-Shot-2018-06-05-at-12.12.17-PM.png

Code

每隔updateInterval调用

updateInterval 可通过参数 yarn.scheduler.fair.update-interval-ms 指定,默认值为500毫秒

  1. private class UpdateThread extends Thread {
  2. @Override
  3. public void run() {
  4. while (!Thread.currentThread().isInterrupted()) {
  5. try {
  6. Thread.sleep(updateInterval);
  7. long start = getClock().getTime();
  8. update();
  9. preemptTasksIfNecessary();
  10. long duration = getClock().getTime() - start;
  11. fsOpDurations.addUpdateThreadRunDuration(duration);
  12. } catch (InterruptedException ie) {
  13. LOG.warn("Update thread interrupted. Exiting.");
  14. return;
  15. } catch (Exception e) {
  16. LOG.error("Exception in fair scheduler UpdateThread", e);
  17. }
  18. }
  19. }
  20. }

在 preemptTasksIfNecessary 方法中有抢占相关的逻辑,首先会判断当前,抢占是否可用,

  1. protected synchronized void preemptTasksIfNecessary() {
  2. if (!shouldAttemptPreemption()) {
  3. return;
  4. }
  5. long curTime = getClock().getTime();
  6. // 判断当前抢占时机是否已到
  7. if (curTime - lastPreemptCheckTime < preemptionInterval) {
  8. return;
  9. }
  10. lastPreemptCheckTime = curTime;
  11. // 初始抢占的资源,默认值为none
  12. Resource resToPreempt = Resources.clone(Resources.none());
  13. for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
  14. // 计算请战资源的总和
  15. Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
  16. }
  17. if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
  18. Resources.none())) {
  19. // 抢占动作
  20. preemptResources(resToPreempt);
  21. }
  22. }

在preemptTasksIfNecessary() 方法中,会先判断请战是否可用,

  1. private boolean shouldAttemptPreemption() {
  2. if (preemptionEnabled) { // yarn.scheduler.fair.preemption是否为true
  3. //集群利用率(内存、CPU二者利用率的最大值,fairSchedule是 single-resource,这里为什么又考虑到Vcore有点不理解)是否超过给定的阈值,
  4. //由 yarn.scheduler.fair.preemption.cluster-utilization-threshold 设置
  5. return (preemptionUtilizationThreshold < Math.max(
  6. (float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
  7. (float) rootMetrics.getAllocatedVirtualCores() /
  8. clusterResource.getVirtualCores()));
  9. }
  10. return false;
  11. }

当抢占开启并可以进行抢占,切抢占时机已到时,首先会计算要抢占的资源,通过resToPreempt()方法会计算每个Schedule要抢占的资源,然后添加到总的抢占资源里。

在resToPreempt() 方法中,会计算当前队列允许抢占其他队列的资源大小,然后返回该资源

  • 如果这个队列使用的资源低于其 min Share 的时间超过minSharePreemptionTimeout,则应该抢占的资源量在它当前fair share和它的 min share之间的差额。
  • 如果队列使用的资源低于它的fair share的时间超过了fairSharePreemptionTimeout,则它应该进行抢占的资源是满足其fair share的资源量,
  • 如果两者都发生,则抢占的值为以上二者的最大值

minSharePreemptionTimeout: 表示如果超过该指定时间,Scheduler还没有获得minShare的资源,则进行抢占

fairSharePreemptionTimeout: 表示如果超过该指定时间,Scheduler还没有获得fairShare的资源,则进行抢占

对代码中一个命令的解释 resToPreempt —> resourceToPreempt,resDueToFairShare,resDueToFairShare同理

  1. protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
  2. long minShareTimeout = sched.getMinSharePreemptionTimeout();
  3. long fairShareTimeout = sched.getFairSharePreemptionTimeout();
  4. Resource resDueToMinShare = Resources.none();
  5. Resource resDueToFairShare = Resources.none();
  6. // minSharePreemptionTimeout
  7. if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
  8. // minShare,demand 之间的最小值作为target
  9. Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
  10. sched.getMinShare(), sched.getDemand());
  11. // target 与 ResourceUsage 之间的最大值即为resDueToMinShare (由于minShare 超时需要获取的资源)
  12. resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
  13. Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
  14. }
  15. // fairSharePreemptionTimeout
  16. if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
  17. Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
  18. sched.getFairShare(), sched.getDemand());
  19. resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
  20. Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
  21. }
  22. Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
  23. resDueToMinShare, resDueToFairShare);
  24. if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
  25. resToPreempt, Resources.none())) {
  26. String message = "Should preempt " + resToPreempt + " res for queue "
  27. + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
  28. + ", resDueToFairShare = " + resDueToFairShare;
  29. LOG.info(message);
  30. }
  31. return resToPreempt;
  32. }

再次回到在preemptTasksIfNecessary() 方法中,拿到了要抢占的资源后

  1. if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
  2. Resources.none())) {
  3. preemptResources(resToPreempt);
  4. }

如果要抢占的资源不为none,则进行资源抢占资源

  1. protected void preemptResources(Resource toPreempt) {
  2. long start = getClock().getTime();
  3. if (Resources.equals(toPreempt, Resources.none())) {
  4. return;
  5. }
  6. // Scan down the list of containers we've already warned and kill them
  7. // if we need to. Remove any containers from the list that we don't need
  8. // or that are no longer running.
  9. Iterator<RMContainer> warnedIter = warnedContainers.iterator();
  10. while (warnedIter.hasNext()) {
  11. RMContainer container = warnedIter.next();
  12. if ((container.getState() == RMContainerState.RUNNING ||
  13. container.getState() == RMContainerState.ALLOCATED) &&
  14. Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
  15. toPreempt, Resources.none())) {
  16. warnOrKillContainer(container);
  17. Resources.subtractFrom(toPreempt, container.getContainer().getResource());
  18. } else {
  19. warnedIter.remove();
  20. }
  21. }
  22. try {
  23. // Reset preemptedResource for each app
  24. for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
  25. queue.resetPreemptedResources();
  26. }
  27. while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
  28. toPreempt, Resources.none())) {
  29. RMContainer container =
  30. getQueueManager().getRootQueue().preemptContainer();
  31. if (container == null) {
  32. break;
  33. } else {
  34. warnOrKillContainer(container);
  35. warnedContainers.add(container);
  36. Resources.subtractFrom(
  37. toPreempt, container.getContainer().getResource());
  38. }
  39. }
  40. } finally {
  41. // Clear preemptedResources for each app
  42. for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
  43. queue.clearPreemptedResources();
  44. }
  45. }
  46. long duration = getClock().getTime() - start;
  47. fsOpDurations.addPreemptCallDuration(duration);
  48. }

发表评论

表情:
评论列表 (有 0 条评论,98人围观)

还没有评论,来说两句吧...

相关阅读