深入理解 YarnFairSchedule 中的饥饿抢占
文章目录
- 饥饿
- 饥饿的两种类型
- FairShare 饥饿
- MinShare 饥饿
- 抢占
- 要抢占的Container
- Code
饥饿
由于 FairSchedule 具有弹性功能,因此对列中运行的应用程序可能使得其他应用程序(在同一队列或不同队列)处于饥饿状态;
在如下例子中,假设仅 tenant1 和 tenant2 队列处于active状态,分别使用了33.3%和66.6%的资源。随后tenant3也变为active状态,则
队列的 Instantaneous FairShare 将分别变为 25%、50%、25%。但此时,tenant3 队列中的应用程序必须等待 tenant1 或 tenant2 中的应用程序释放资源。在此之前,tenant3 将以未满足资源需求或饥饿的形式等待container。
通过抢占,可以以可预测的方式调整这种不平衡,它允许从超过其FairShare的队列中回收资源,而不用等待队列释放资源。
饥饿的两种类型
- FairShare 饥饿
- MinShare 饥饿
FairShare 饥饿
当满足以下所有条件时,一个作业可能将遭受 FairShare 饥饿
- 该作业有未满足的资源需求
- 该作业资源使用量低于其 Instantaneous Fair Share
- 该作业的资源使用量低于其 Instantaneous Fair Share 的 公平共享抢占阈值(FairShare Preemption Threshold) 持续时间超过公平共享超时(FairShare Preemption Timeout).
公平共享抢占阈值(FairShare Preemption Threshold) 默认0.5,公平共享超时(FairShare Preemption Timeout)默认情况下未设置;需要明确设置它,以使应用程序被视为饥饿。这两个参数可以在全局或队列级别设置。
例如:假设队列的 FairShare 抢占超时为5秒,FairShare 请战阈值为0.8,如果启用了抢占并且满足上述条件的1和2,当作业在5秒内未获得其Instantaneous FairShare 的80%,FairScheduler将认为该作业处于饥饿状态。
注意:开启抢占并不能保证一个作业或队列能获得其完整的Instantaneous FairShare,只保证一个作业或队列能够获得足够的资源,不再别视为饥饿。此外,此保证还取决于Resource Manager找到要抢占的容器
对于高优先级队列,可以通过将抢占的阈值设置为较高的值,抢占的超时时间设置较小的时间,并将其标记为不可抢占,来积极配置饥饿。
MinShare 饥饿
当一个队列满足以下所以条件时,将产生 MinShare 饥饿
- 队列中的一个或多个应用程序有未满足的资源需求
- 队列的资源使用量低于其MinShare
- 队列的资源使用量低于其 MinShare 的时间超过 MinShare 抢占超时时间
默认情况下未设置MinShare抢占超时时间,需要明确的设置它,以使队列被认为是饥饿的。该参数可以全局设置或在队列级别设置。
MinShare 只能指定在队列级别,不像FairShare,因为作业不会产生因无法满足MinShare而产生饥饿
当队列因饥饿而获取资源后,按需求对队列中的应用程序进行排序。因为即使把队列获得的所有资源都分配给队列中的作业,也可能导致某些作业还是处于饥饿的状态。
例如:假设配置的队列的最小内存,并且该内存比其最小内存低6G,
抢占
抢占的目的:
抢占的弊端:会降低集群的执行效率,因为抢占终止的container会被重新执行
同时满足以下两个条件时,将启用抢占功能:
- yarn.scheduler.fair.preemption = true。默认为false.
- 整个集群的资源利用率超过指定的阀值,利用率=已使用资源/集群总资源(yarn.scheduler.fair.preemption.cluster-utilization-threshold),默认0.8f
集群利用率是抢占的先决条件,在利用率低的集群上抢占容器会引起不必要的容器搅动,从而影响性能。集群利用率由 内存或 vCore 利用率的较大者表示。
注意:如果有必要,可以将队列标记为不可抢占,例如,因为它是最高优先级队列,因此永远不能抢占队列中的资源。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MQWd9l1q-1605867049112)(https://i.loli.net/2020/11/18/GWwZ132Jx9jBsAd.png)\]
要抢占的Container
通过抢占一个或多个节点上运行的container,可以满足应用程序的资源需求。当多个节点上的容器可以满足此资源请求时,会优先考虑AM最少的节点,此外,只有满足以下两个条件时,才能抢占容器:
- 容器的应用程序被置为可抢占
- 杀死容器不会使作业资源低于FairShare。(换句话说,抢占后最终使用的资源少于其FairShare的作业不能被抢占)
在决定要抢占哪个容器时不使用MinShare 。MinShare仅用于配置饥饿。这是非常重要的,因为不管是否配置了FairShare或MinShare饥饿,都只能抢占超过FairShare的作业,如下表所示:
Code
每隔updateInterval调用
updateInterval 可通过参数
yarn.scheduler.fair.update-interval-ms
指定,默认值为500毫秒
private class UpdateThread extends Thread {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(updateInterval);
long start = getClock().getTime();
update();
preemptTasksIfNecessary();
long duration = getClock().getTime() - start;
fsOpDurations.addUpdateThreadRunDuration(duration);
} catch (InterruptedException ie) {
LOG.warn("Update thread interrupted. Exiting.");
return;
} catch (Exception e) {
LOG.error("Exception in fair scheduler UpdateThread", e);
}
}
}
}
在 preemptTasksIfNecessary 方法中有抢占相关的逻辑,首先会判断当前,抢占是否可用,
protected synchronized void preemptTasksIfNecessary() {
if (!shouldAttemptPreemption()) {
return;
}
long curTime = getClock().getTime();
// 判断当前抢占时机是否已到
if (curTime - lastPreemptCheckTime < preemptionInterval) {
return;
}
lastPreemptCheckTime = curTime;
// 初始抢占的资源,默认值为none
Resource resToPreempt = Resources.clone(Resources.none());
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
// 计算请战资源的总和
Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) {
// 抢占动作
preemptResources(resToPreempt);
}
}
在preemptTasksIfNecessary() 方法中,会先判断请战是否可用,
private boolean shouldAttemptPreemption() {
if (preemptionEnabled) { // yarn.scheduler.fair.preemption是否为true
//集群利用率(内存、CPU二者利用率的最大值,fairSchedule是 single-resource,这里为什么又考虑到Vcore有点不理解)是否超过给定的阈值,
//由 yarn.scheduler.fair.preemption.cluster-utilization-threshold 设置
return (preemptionUtilizationThreshold < Math.max(
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
(float) rootMetrics.getAllocatedVirtualCores() /
clusterResource.getVirtualCores()));
}
return false;
}
当抢占开启并可以进行抢占,切抢占时机已到时,首先会计算要抢占的资源,通过resToPreempt()方法会计算每个Schedule要抢占的资源,然后添加到总的抢占资源里。
在resToPreempt() 方法中,会计算当前队列允许抢占其他队列的资源大小,然后返回该资源
- 如果这个队列使用的资源低于其 min Share 的时间超过minSharePreemptionTimeout,则应该抢占的资源量在它当前fair share和它的 min share之间的差额。
- 如果队列使用的资源低于它的fair share的时间超过了fairSharePreemptionTimeout,则它应该进行抢占的资源是满足其fair share的资源量,
- 如果两者都发生,则抢占的值为以上二者的最大值
minSharePreemptionTimeout: 表示如果超过该指定时间,Scheduler还没有获得minShare的资源,则进行抢占
fairSharePreemptionTimeout: 表示如果超过该指定时间,Scheduler还没有获得fairShare的资源,则进行抢占
对代码中一个命令的解释 resToPreempt —> resourceToPreempt,resDueToFairShare,resDueToFairShare同理
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
long minShareTimeout = sched.getMinSharePreemptionTimeout();
long fairShareTimeout = sched.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
// minSharePreemptionTimeout
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
// minShare,demand 之间的最小值作为target
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
// target 与 ResourceUsage 之间的最大值即为resDueToMinShare (由于minShare 超时需要获取的资源)
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
// fairSharePreemptionTimeout
if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getFairShare(), sched.getDemand());
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
resDueToMinShare, resDueToFairShare);
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
resToPreempt, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
+ ", resDueToFairShare = " + resDueToFairShare;
LOG.info(message);
}
return resToPreempt;
}
再次回到在preemptTasksIfNecessary() 方法中,拿到了要抢占的资源后
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) {
preemptResources(resToPreempt);
}
如果要抢占的资源不为none,则进行资源抢占资源
protected void preemptResources(Resource toPreempt) {
long start = getClock().getTime();
if (Resources.equals(toPreempt, Resources.none())) {
return;
}
// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if ((container.getState() == RMContainerState.RUNNING ||
container.getState() == RMContainerState.ALLOCATED) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
warnOrKillContainer(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else {
warnedIter.remove();
}
}
try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.resetPreemptedResources();
}
while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
RMContainer container =
getQueueManager().getRootQueue().preemptContainer();
if (container == null) {
break;
} else {
warnOrKillContainer(container);
warnedContainers.add(container);
Resources.subtractFrom(
toPreempt, container.getContainer().getResource());
}
}
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
queue.clearPreemptedResources();
}
}
long duration = getClock().getTime() - start;
fsOpDurations.addPreemptCallDuration(duration);
}
还没有评论,来说两句吧...