【k8s】——k8s scheduler调度器原理及代码分析

超、凢脫俗 2022-10-15 04:53 185阅读 0赞

一、前言

  1. 我们都知道,k8s其实说的直白点是一个容器的管家,它管容器的创建、运行和销毁。在k8s中,万物都可以叫做资源,其实k8s的设计理念很像编程语言的面向对象,把一切资源化之后,每一种资源就是一种对象。在针对各种资源设计controller(控制器)和 scheduler(调度器),有了这两个之后还需要通信机制,这个时候apiserver就来了,然后还需要数据存储(etcd)。你看这样一算下来,k8s的这点结构其实就说清楚了。
  2. 我今天重点聊一下k8s 的调度器。kube-schedulerKubernetes中的关键模块,扮演管家的角色遵从一套机制——为Pod提供调度服务,例如基于资源的公平调度、调度Pod到指定节点、或者通信频繁的Pod调度到同一节点等。容器调度本身是一件比较复杂的事,因为要确保以下几个目标:
  • 公平性:在调度Pod时需要公平的进行决策,每个节点都有被分配资源的机会,调度器需要对不同节点的使用作出平衡决策。
  • 资源高效利用:最大化群集所有资源的利用率,使有限的CPU、内存等资源服务尽可能更多的Pod。
  • 效率问题:能快速的完成对大批量Pod的调度工作,在集群规模扩增的情况下,依然保证调度过程的性能。
  • 灵活性:在实际运作中,用户往往希望Pod的调度策略是可控的,从而处理大量复杂的实际问题。因此平台要允许多个调度器并行工作,同时支持自定义调度器。

为达到上述目标,kube-scheduler通过结合Node资源、负载情况、数据位置等各种因素进行调度判断,确保在满足场景需求的同时将Pod分配到最优节点。显然,kube-scheduler影响着Kubernetes集群的可用性与性能,Pod数量越多集群的调度能力越重要,尤其达到了数千级节点数时,优秀的调度能力将显著提升容器平台性能。

二、kube-scheduler组件构成

下图是 kube-scheduler 的主要几大组件:

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MDQ0OTMwMA_size_16_color_FFFFFF_t_70

1. Policy

Scheduler 的调度策略启动配置目前支持三种方式,配置文件 / 命令行参数 / ConfigMap。调度策略可以配置指定调度主流程中要用哪些过滤器 (Predicates)、打分器 (Priorities) 、外部扩展的调度器 (Extenders),以及最新支持的 SchedulerFramwork 的自定义扩展点 (Plugins)。

2. Informer

Scheduler 在启动的时候通过 K8s 的 informer 机制以 List+Watch 从 kube-apiserver 获取调度需要的数据例如:Pods、Nodes、Persistant Volume(PV), Persistant Volume Claim(PVC) 等等,并将这些数据做一定的预处理作为调度器的的 Cache。

3.调度流水线

通过 Informer 将需要调度的 Pod 插入 Queue 中,Pipeline 会循环从 Queue Pop 等待调度的 Pod 放入 Pipeline 执行。

调度流水线 (Schedule Pipeline) 主要有三个阶段:Scheduler Thread,Wait Thread,Bind Thread。

  • Scheduler Thread 阶段: 从如上的架构图可以看到 Schduler Thread 会经历 Pre Filter -> Filter -> Post Filter-> Score -> Reserve,可以简单理解为 Filter -> Score -> Reserve。

Filter 阶段用于选择符合 Pod Spec 描述的 Nodes;Score 阶段用于从 Filter 过后的 Nodes 进行打分和排序;Reserve 阶段将 Pod 跟排序后的最优 Node 的 NodeCache 中,表示这个 Pod 已经分配到这个 Node 上, 让下一个等待调度的 Pod 对这个 Node 进行 Filter 和 Score 的时候能看到刚才分配的 Pod。

  • Wait Thread 阶段:这个阶段可以用来等待 Pod 关联的资源的 Ready 等待,例如等待 PVC 的 PV 创建成功,或者 Gang 调度中等待关联的 Pod 调度成功等等;
  • Bind Thread 阶段:用于将 Pod 和 Node 的关联持久化 Kube APIServer。

整个调度流水线只有在 Scheduler Thread 阶段是串行的一个 Pod 一个 Pod 的进行调度,在 Wait 和 Bind 阶段 Pod 都是异步并行执行。

三、调度算法

kube-scheduler的根本工作任务是根据各种调度算法将Pod绑定(bind)到最合适的工作节点,整个调度流程分为两个阶段:预选策略(Predicates)和优选策略(Priorities)。

  1. 预选(Predicates):输入是所有节点,输出是满足预选条件的节点。kube-scheduler根据预选策略过滤掉不满足策略的Nodes。例如,如果某节点的资源不足或者不满足预选策略的条件如“Node的label必须与Pod的Selector一致”时则无法通过预选。
  2. 优选(Priorities):输入是预选阶段筛选出的节点,优选会根据优先策略为通过预选的Nodes进行打分排名,选择得分最高的Node。例如,资源越富裕、负载越小的Node可能具有越高的排名。

通俗点说,调度的过程就是在回答两个问题:1. 候选有哪些?2. 其中最适合的是哪个?

值得一提的是,如果在预选阶段没有节点满足条件,Pod会一直处在Pending状态直到出现满足的节点,在此期间调度器会不断的进行重试。

1.预选策略(Predicates)

过滤条件包含如下:

  • PodFitsHostPorts:检查Pod容器所需的HostPort是否已被节点上其它容器或服务占用。如果已被占用,则禁止Pod调度到该节点。
  • PodFitsHost:检查Pod指定的NodeName是否匹配当前节点。
  • PodFitsResources:检查节点是否有足够空闲资源(例如CPU和内存)来满足Pod的要求。
  • PodMatchNodeSelector:检查Pod的节点选择器(nodeSelector)是否与节点(Node)的标签匹配
  • NoVolumeZoneConflict:对于给定的某块区域,判断如果在此区域的节点上部署Pod是否存在卷冲突。
  • NoDiskConflict:根据节点请求的卷和已经挂载的卷,评估Pod是否适合该节点。
  • MaxCSIVolumeCount:决定应该附加多少CSI卷,以及该卷是否超过配置的限制。
  • CheckNodeMemoryPressure:如果节点报告内存压力,并且没有配置异常,那么将不会往那里调度Pod。
  • CheckNodePIDPressure:如果节点报告进程id稀缺,并且没有配置异常,那么将不会往那里调度Pod。
  • CheckNodeDiskPressure:如果节点报告存储压力(文件系统已满或接近满),并且没有配置异常,那么将不会往那里调度Pod。
  • CheckNodeCondition:节点可以报告它们有一个完全完整的文件系统,然而网络不可用,或者kubelet没有准备好运行Pods。如果为节点设置了这样的条件,并且没有配置异常,那么将不会往那里调度Pod。
  • PodToleratesNodeTaints:检查Pod的容忍度是否能容忍节点的污点。
  • CheckVolumeBinding:评估Pod是否适合它所请求的容量。这适用于约束和非约束PVC。

如果在predicates(预选)过程中没有合适的节点,那么Pod会一直在pending状态,不断重试调度,直到有节点满足条件。

经过这个步骤,如果有多个节点满足条件,就继续priorities过程,最后按照优先级大小对节点排序。

2.优选策略(Priorities)

包含如下优选评分条件:

  • SelectorSpreadPriority:对于属于同一服务、有状态集或副本集(Service,StatefulSet or ReplicaSet)的Pods,会将Pods尽量分散到不同主机上。
  • InterPodAffinityPriority:策略有podAffinity和podAntiAffinity两种配置方式。简单来说,就说根据Node上运行的Pod的Label来进行调度匹配的规则,匹配的表达式有:In, NotIn, Exists, DoesNotExist,通过该策略,可以更灵活地对Pod进行调度。
  • LeastRequestedPriority:偏向使用较少请求资源的节点。换句话说,放置在节点上的Pod越多,这些Pod使用的资源越多,此策略给出的排名就越低。
  • MostRequestedPriority:偏向具有最多请求资源的节点。这个策略将把计划的Pods放到整个工作负载集所需的最小节点上运行。
  • RequestedToCapacityRatioPriority:使用默认的资源评分函数模型创建基于ResourceAllocationPriority的requestedToCapacity。
  • BalancedResourceAllocation:偏向具有平衡资源使用的节点。
  • NodePreferAvoidPodsPriority:根据节点注释scheduler.alpha.kubernet .io/preferAvoidPods为节点划分优先级。可以使用它来示意两个不同的Pod不应在同一Node上运行。
  • NodeAffinityPriority:根据preferredduringschedulingignoredingexecution中所示的节点关联调度偏好来对节点排序。
  • TaintTolerationPriority:根据节点上无法忍受的污点数量,为所有节点准备优先级列表。此策略将考虑该列表调整节点的排名。
  • ImageLocalityPriority:偏向已经拥有本地缓存Pod容器镜像的节点。
  • ServiceSpreadingPriority:对于给定的服务,此策略旨在确保Service的Pods运行在不同的节点上。总的结果是,Service对单个节点故障变得更有弹性。
  • EqualPriority:赋予所有节点相同的权值1。
  • EvenPodsSpreadPriority:实现择优 pod的拓扑扩展约束

四、代码分析

kubernetes 版本: v1.16,scheduler的pkg代码目录结构如下,有时间还是阅读一下源码比较好:

  1. scheduler
  2. ├── algorithm # 主要包含调度的算法
  3. ├── predicates # 预选的策略
  4. ├── priorities # 优选的策略
  5. ├── scheduler_interface.go # ScheduleAlgorithm、SchedulerExtender接口定义
  6. ├── types.go # 使用到的type的定义
  7. ├── algorithmprovider
  8. ├── defaults
  9. ├── defaults.go # 默认算法的初始化操作,包括预选和优选策略
  10. ├── cache # scheduler调度使用到的cache
  11. ├── cache.go # schedulerCache
  12. ├── interface.go
  13. ├── node_info.go
  14. ├── node_tree.go
  15. ├── core # 调度逻辑的核心代码
  16. ├── equivalence
  17. ├── eqivalence.go # 存储相同pod的调度结果缓存,主要给预选策略使用
  18. ├── extender.go
  19. ├── generic_scheduler.go # genericScheduler,主要包含默认调度器的调度逻辑
  20. ├── scheduling_queue.go # 调度使用到的队列,主要用来存储需要被调度的pod
  21. ├── factory
  22. ├── factory.go # 主要包括NewConfigFactory、NewPodInformer,监听pod事件来更新调度队列
  23. ├── metrics
  24. └── metrics.go # 主要给prometheus使用
  25. ├── scheduler.go # pkg部分的Run入口(核心代码),主要包含Run、scheduleOne、schedule、preempt等函数
  26. └── volumebinder
  27. └── volume_binder.go # volume bind

kubernetes 中所有组件的启动流程都是类似的,首先会解析命令行参数、添加默认值,kube-scheduler 的默认参数在 k8s.io/kubernetes/pkg/scheduler/apis/config/v1alpha1/defaults.go 中定义的。然后会执行 run 方法启动主逻辑,下面直接看 kube-scheduler 的主逻辑 run 方法执行过程。

Run() 方法主要做了以下工作:

  • 初始化 scheduler 对象
  • 启动 kube-scheduler server,kube-scheduler 监听 10251 和 10259 端口,10251 端口不需要认证,可以获取 healthz metrics 等信息,10259 为安全端口,需要认证
  • 启动所有的 informer
  • 执行 sched.Run() 方法,执行主调度逻辑

k8s.io/kubernetes/cmd/kube-scheduler/app/server.go:160

  1. func Run(cc schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}, registryOptions ...Option) error {
  2. ......
  3. // 1、初始化 scheduler 对象
  4. sched, err := scheduler.New(......)
  5. if err != nil {
  6. return err
  7. }
  8. // 2、启动事件广播
  9. if cc.Broadcaster != nil && cc.EventClient != nil {
  10. cc.Broadcaster.StartRecordingToSink(stopCh)
  11. }
  12. if cc.LeaderElectionBroadcaster != nil && cc.CoreEventClient != nil {
  13. cc.LeaderElectionBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
  14. }
  15. ......
  16. // 3、启动 http server
  17. if cc.InsecureServing != nil {
  18. separateMetrics := cc.InsecureMetricsServing != nil
  19. handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
  20. if err := cc.InsecureServing.Serve(handler, 0, stopCh); err != nil {
  21. return fmt.Errorf("failed to start healthz server: %v", err)
  22. }
  23. }
  24. ......
  25. // 4、启动所有 informer
  26. go cc.PodInformer.Informer().Run(stopCh)
  27. cc.InformerFactory.Start(stopCh)
  28. cc.InformerFactory.WaitForCacheSync(stopCh)
  29. run := func(ctx context.Context) {
  30. sched.Run()
  31. <-ctx.Done()
  32. }
  33. ctx, cancel := context.WithCancel(context.TODO()) // TODO once Run() accepts a context, it should be used here
  34. defer cancel()
  35. go func() {
  36. select {
  37. case <-stopCh:
  38. cancel()
  39. case <-ctx.Done():
  40. }
  41. }()
  42. // 5、选举 leader
  43. if cc.LeaderElection != nil {
  44. ......
  45. }
  46. // 6、执行 sched.Run() 方法
  47. run(ctx)
  48. return fmt.Errorf("finished without leader elect")

下面看一下 scheduler.New() 方法是如何初始化 scheduler 结构体的,该方法主要的功能是初始化默认的调度算法以及默认的调度器 GenericScheduler。

  • 创建 scheduler 配置文件
  • 根据默认的 DefaultProvider 初始化 schedulerAlgorithmSource 然后加载默认的预选及优选算法,然后初始化 GenericScheduler
  • 若启动参数提供了 policy config 则使用其覆盖默认的预选及优选算法并初始化 GenericScheduler,不过该参数现已被弃用

k8s.io/kubernetes/pkg/scheduler/scheduler.go:166

  1. func New(......) (*Scheduler, error) {
  2. ......
  3. // 1、创建 scheduler 的配置文件
  4. configurator := factory.NewConfigFactory(&factory.ConfigFactoryArgs{
  5. ......
  6. })
  7. var config *factory.Config
  8. source := schedulerAlgorithmSource
  9. // 2、加载默认的调度算法
  10. switch {
  11. case source.Provider != nil:
  12. // 使用默认的 ”DefaultProvider“ 初始化 config
  13. sc, err := configurator.CreateFromProvider(*source.Provider)
  14. if err != nil {
  15. return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
  16. }
  17. config = sc
  18. case source.Policy != nil:
  19. // 通过启动时指定的 policy source 加载 config
  20. ......
  21. config = sc
  22. default:
  23. return nil, fmt.Errorf("unsupported algorithm source: %v", source)
  24. }
  25. // Additional tweaks to the config produced by the configurator.
  26. config.Recorder = recorder
  27. config.DisablePreemption = options.disablePreemption
  28. config.StopEverything = stopCh
  29. // 3.创建 scheduler 对象
  30. sched := NewFromConfig(config)
  31. ......
  32. return sched, nil
  33. }

下面是 pod informer 的启动逻辑,只监听 status.phase 不为 succeeded 以及 failed 状态的 pod,即非 terminating 的 pod。

k8s.io/kubernetes/pkg/scheduler/factory/factory.go:527

  1. func NewPodInformer(client clientset.Interface, resyncPeriod time.Duration) coreinformers.PodInformer {
  2. selector := fields.ParseSelectorOrDie(
  3. "status.phase!=" + string(v1.PodSucceeded) +
  4. ",status.phase!=" + string(v1.PodFailed))
  5. lw := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), string(v1.ResourcePods), metav1.NamespaceAll, selector)
  6. return &podInformer{
  7. informer: cache.NewSharedIndexInformer(lw, &v1.Pod{}, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}),
  8. }
  9. }

然后继续看 Run() 方法中最后执行的 sched.Run() 调度循环逻辑,若 informer 中的 cache 同步完成后会启动一个循环逻辑执行 sched.scheduleOne 方法。

k8s.io/kubernetes/pkg/scheduler/scheduler.go:313

  1. func (sched *Scheduler) Run() {
  2. if !sched.config.WaitForCacheSync() {
  3. return
  4. }
  5. go wait.Until(sched.scheduleOne, 0, sched.config.StopEverything)
  6. }

scheduleOne() 每次对一个 pod 进行调度,主要有以下步骤:

  • 从 scheduler 调度队列中取出一个 pod,如果该 pod 处于删除状态则跳过
  • 执行调度逻辑 sched.schedule() 返回通过预算及优选算法过滤后选出的最佳 node
  • 如果过滤算法没有选出合适的 node,则返回 core.FitError
  • 若没有合适的 node 会判断是否启用了抢占策略,若启用了则执行抢占机制
  • 判断是否需要 VolumeScheduling 特性
  • 执行 reserve plugin
  • pod 对应的 spec.NodeName 写上 scheduler 最终选择的 node,更新 scheduler cache
  • 请求 apiserver 异步处理最终的绑定操作,写入到 etcd
  • 执行 permit plugin
  • 执行 prebind plugin
  • 执行 postbind plugin

k8s.io/kubernetes/pkg/scheduler/scheduler.go:515

  1. func (sched *Scheduler) scheduleOne() {
  2. fwk := sched.Framework
  3. pod := sched.NextPod()
  4. if pod == nil {
  5. return
  6. }
  7. // 1.判断 pod 是否处于删除状态
  8. if pod.DeletionTimestamp != nil {
  9. ......
  10. }
  11. // 2.执行调度策略选择 node
  12. start := time.Now()
  13. pluginContext := framework.NewPluginContext()
  14. scheduleResult, err := sched.schedule(pod, pluginContext)
  15. if err != nil {
  16. if fitError, ok := err.(*core.FitError); ok {
  17. // 3.若启用抢占机制则执行
  18. if sched.DisablePreemption {
  19. ......
  20. } else {
  21. preemptionStartTime := time.Now()
  22. sched.preempt(pluginContext, fwk, pod, fitError)
  23. ......
  24. }
  25. ......
  26. metrics.PodScheduleFailures.Inc()
  27. } else {
  28. klog.Errorf("error selecting node for pod: %v", err)
  29. metrics.PodScheduleErrors.Inc()
  30. }
  31. return
  32. }
  33. ......
  34. assumedPod := pod.DeepCopy()
  35. // 4.判断是否需要 VolumeScheduling 特性
  36. allBound, err := sched.assumeVolumes(assumedPod, scheduleResult.SuggestedHost)
  37. if err != nil {
  38. klog.Errorf("error assuming volumes: %v", err)
  39. metrics.PodScheduleErrors.Inc()
  40. return
  41. }
  42. // 5.执行 "reserve" plugins
  43. if sts := fwk.RunReservePlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
  44. .....
  45. }
  46. // 6.为 pod 设置 NodeName 字段,更新 scheduler 缓存
  47. err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
  48. if err != nil {
  49. ......
  50. }
  51. // 7.异步请求 apiserver
  52. go func() {
  53. // Bind volumes first before Pod
  54. if !allBound {
  55. err := sched.bindVolumes(assumedPod)
  56. if err != nil {
  57. ......
  58. return
  59. }
  60. }
  61. // 8.执行 "permit" plugins
  62. permitStatus := fwk.RunPermitPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  63. if !permitStatus.IsSuccess() {
  64. ......
  65. }
  66. // 9.执行 "prebind" plugins
  67. preBindStatus := fwk.RunPreBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  68. if !preBindStatus.IsSuccess() {
  69. ......
  70. }
  71. err := sched.bind(assumedPod, scheduleResult.SuggestedHost, pluginContext)
  72. ......
  73. if err != nil {
  74. ......
  75. } else {
  76. ......
  77. // 10.执行 "postbind" plugins
  78. fwk.RunPostBindPlugins(pluginContext, assumedPod, scheduleResult.SuggestedHost)
  79. }
  80. }()
  81. }

scheduleOne() 中通过调用 sched.schedule() 来执行预选与优选算法处理:

k8s.io/kubernetes/pkg/scheduler/scheduler.go:337

  1. func (sched *Scheduler) schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (core.ScheduleResult, error) {
  2. result, err := sched.Algorithm.Schedule(pod, pluginContext)
  3. if err != nil {
  4. ......
  5. }
  6. return result, err
  7. }

sched.Algorithm 是一个 interface,主要包含四个方法,GenericScheduler 是其具体的实现:

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:131

  1. type ScheduleAlgorithm interface {
  2. Schedule(*v1.Pod, *framework.PluginContext) (scheduleResult ScheduleResult, err error)
  3. Preempt(*framework.PluginContext, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
  4. Predicates() map[string]predicates.FitPredicate
  5. Prioritizers() []priorities.PriorityConfig
  6. }
  • Schedule():正常调度逻辑,包含预算与优选算法的执行
  • Preempt():抢占策略,在 pod 调度发生失败的时候尝试抢占低优先级的 pod,函数返回发生抢占的 node,被 抢占的 pods 列表,nominated node name 需要被移除的 pods 列表以及 error
  • Predicates():predicates 算法列表
  • Prioritizers():prioritizers 算法列表

kube-scheduler 提供的默认调度为 DefaultProvider,DefaultProvider 配置的 predicates 和 priorities policies 在 k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go 中定义,算法具体实现是在 k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/ 中,默认的算法如下所示:

pkg/scheduler/algorithmprovider/defaults/defaults.go

  1. func defaultPredicates() sets.String {
  2. return sets.NewString(
  3. predicates.NoVolumeZoneConflictPred,
  4. predicates.MaxEBSVolumeCountPred,
  5. predicates.MaxGCEPDVolumeCountPred,
  6. predicates.MaxAzureDiskVolumeCountPred,
  7. predicates.MaxCSIVolumeCountPred,
  8. predicates.MatchInterPodAffinityPred,
  9. predicates.NoDiskConflictPred,
  10. predicates.GeneralPred,
  11. predicates.CheckNodeMemoryPressurePred,
  12. predicates.CheckNodeDiskPressurePred,
  13. predicates.CheckNodePIDPressurePred,
  14. predicates.CheckNodeConditionPred,
  15. predicates.PodToleratesNodeTaintsPred,
  16. predicates.CheckVolumeBindingPred,
  17. )
  18. }
  19. func defaultPriorities() sets.String {
  20. return sets.NewString(
  21. priorities.SelectorSpreadPriority,
  22. priorities.InterPodAffinityPriority,
  23. priorities.LeastRequestedPriority,
  24. priorities.BalancedResourceAllocation,
  25. priorities.NodePreferAvoidPodsPriority,
  26. priorities.NodeAffinityPriority,
  27. priorities.TaintTolerationPriority,
  28. priorities.ImageLocalityPriority,
  29. )
  30. }

下面继续看 sched.Algorithm.Schedule() 调用具体调度算法的过程:

  • 检查 pod pvc 信息
  • 执行 prefilter plugins
  • 获取 scheduler cache 的快照,每次调度 pod 时都会获取一次快照
  • 执行 g.findNodesThatFit() 预选算法
  • 执行 postfilter plugin
  • 若 node 为 0 直接返回失败的 error,若 node 数为1 直接返回该 node
  • 执行 g.priorityMetaProducer() 获取 metaPrioritiesInterface,计算 pod 的metadata,检查该 node 上是否有相同 meta 的 pod
  • 执行 PrioritizeNodes() 算法
  • 执行 g.selectHost() 通过得分选择一个最佳的 node

k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go:186

  1. func (g *genericScheduler) Schedule(pod *v1.Pod, pluginContext *framework.PluginContext) (result ScheduleResult, err error) {
  2. ......
  3. // 1.检查 pod pvc
  4. if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
  5. return result, err
  6. }
  7. // 2.执行 "prefilter" plugins
  8. preFilterStatus := g.framework.RunPreFilterPlugins(pluginContext, pod)
  9. if !preFilterStatus.IsSuccess() {
  10. return result, preFilterStatus.AsError()
  11. }
  12. // 3.获取 node 数量
  13. numNodes := g.cache.NodeTree().NumNodes()
  14. if numNodes == 0 {
  15. return result, ErrNoNodesAvailable
  16. }
  17. // 4.快照 node 信息
  18. if err := g.snapshot(); err != nil {
  19. return result, err
  20. }
  21. // 5.执行预选算法
  22. startPredicateEvalTime := time.Now()
  23. filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod)
  24. if err != nil {
  25. return result, err
  26. }
  27. // 6.执行 "postfilter" plugins
  28. postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
  29. if !postfilterStatus.IsSuccess() {
  30. return result, postfilterStatus.AsError()
  31. }
  32. // 7.预选后没有合适的 node 直接返回
  33. if len(filteredNodes) == 0 {
  34. ......
  35. }
  36. startPriorityEvalTime := time.Now()
  37. // 8.若只有一个 node 则直接返回该 node
  38. if len(filteredNodes) == 1 {
  39. return ScheduleResult{
  40. SuggestedHost: filteredNodes[0].Name,
  41. EvaluatedNodes: 1 + len(failedPredicateMap),
  42. FeasibleNodes: 1,
  43. }, nil
  44. }
  45. // 9.获取 pod meta 信息,执行优选算法
  46. metaPrioritiesInterface := g.priorityMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)
  47. priorityList, err := PrioritizeNodes(pod, g.nodeInfoSnapshot.NodeInfoMap, metaPrioritiesInterface, g.prioritizers, filteredNodes, g.extenders, g.framework, pluginContext)
  48. if err != nil {
  49. return result, err
  50. }
  51. // 10.根据打分选择最佳的 node
  52. host, err := g.selectHost(priorityList)
  53. trace.Step("Selecting host done")
  54. return ScheduleResult{
  55. SuggestedHost: host,
  56. EvaluatedNodes: len(filteredNodes) + len(failedPredicateMap),
  57. FeasibleNodes: len(filteredNodes),
  58. }, err
  59. }

五、DIY调度器

Kubernetes 自带了一个默认调度器kube-scheduler,其内置了很多节点预选和优选的调度算法,一般调度场景下可以满足要求。但是在一些特殊场景下,默认调度器不能满足我们复杂的调度需求。我们就需要对调度器进行扩展,以达到调度适合业务场景的目的。中间件redis容器化后,需要两主不能在同一个节点上,一对主从不能在同一节点上;elasticsearch容器化后,两个data实例不能在同一节点上。在这类场景下,默认调度器内置的预选、优选算法不能满足需求,我们有以下三种选择:

  1. 将新的调度算法添加到默认调度程序中,并重新编译镜像,最终该镜像运行的实例作为kubernetes集群调度器;
  2. 参考kube-scheduler实现满足自己业务场景的调度程序,并编译镜像,将该程序作为独立的调度器运行到kubernetes集群内,需要用该调度器调度的pod实例,在spec.schedulerName里指定该调度器;
  3. 实现“调度扩展程序“:默认调度器kube-scheduler在进行预选时会调用该扩展程序进行过滤节点;在优选时会调用该扩展程序进行给节点打分,或者在bind操作时,调用该扩展器进行bind操作。

对上述三种方式进行评估:

  • 第一种:将自己的调度算法添加到默认调度器kube-scheduler中,对原生代码侵入性较高,而且随着kubernetes版本升级,维护成本也较高;
  • 第二种:默认调度器里内置了很多优秀调度算法,如:检查节点资源是否充足;端口是否占用;volume是否被其他pod挂载;亲和性;均衡节点资源利用等,如果完全使用自己开发的调度器程序,可能在达到了实际场景调度需求同时,失去更佳的调度方案,除非集成默认调度器中的算法到自己独立调度程序中,但这无疑是不现实的;
  • 第三种:通过启动参数的policy配置,选用某些默认调度器中的预选、优选调度算法的同时,也可以调用外部扩展调度程序的算法,计算得到最优的调度节点,无需修改kube-scheduler代码,只需要在启动参数中增加配置文件即可将默认调度程序和扩展调度程序相互关联。

    第三种方法对原生代码侵入性小,也不会破坏原有的策略,是我们的最优解。如何实现可参考文档:https://github.com/kubernetes/community/blob/master/contributors/design-proposals/scheduling/scheduler_extender.md

六、参考资料

  1. https://kubernetes.io/docs/concepts/scheduling-eviction/scheduling-framework/

  2. https://blog.tianfeiyu.com/2019/10/21/kube_scheduler_process/

  3. https://blog.csdn.net/ll837448792/article/details/93619549

  4. https://zhuanlan.zhihu.com/p/56088355

  5. https://www.cnblogs.com/qinghe123/p/12204779.html

发表评论

表情:
评论列表 (有 0 条评论,185人围观)

还没有评论,来说两句吧...

相关阅读