05. kube-scheduler资源调度原理深入解析

概述

我们在学习kubernetes除了要了解其相关概念外,我们还需要深入了解整个kubernetes的实现机制,如果还能了解其源码,那我们才算是对kubernetes比较熟悉。我将用kubernetes是如何生成一个deployment的资源,并且附带源码解读的方式讲解kubernetes的整个实现机制。源码版本是1.22

之前的文章

前面的文章提到过,一个depoyment资源的创建,由kubectl发送请求到apiserver,apiserver负责将请求的资源信息存入到etcd中。而实际对这个资源进行控制的是controller-manager。deployment-controller-manager和replicaset-controller-manager配合最终生成pod资源。pod资源存储在etcd后,pod将调度到哪个节点呢?这是k8s要考虑的一个问题。这篇文章将会剖析下k8s是如何解决这个问题的。

原理分析

此次原理分析,我打算换个方式来分析从解答问题的角度来去剖析执行kubectl apply -f deploy.yaml文件后,kube-scheduler是如何搞事情的。

关键问题

我们已经弄清楚kube-scheduler在做调度这件事。那问题来了,

  1. 为什么要做调度这件事呢?
  2. Kube-scheduler是怎么解决调度这个问题的?
  3. kube-scheduler又是如何通过代码去实现的呢?

为什么要做调度

如果只是单个节点,我认为操作系统自身的调度机制能够帮助我们解决了资源分配的问题。但是大的环境下我们采用了分布式服务,将进程部署在多台节点上组成一个集群提供服务。这时候资源如何进行分配就成了一个问题。这是一个大的背景。那我们从技术角度上去看,你无非就是要将进程部署到不同节点嘛,随机分配下不就好了吗,还需要用个调度吗?我们知道不同的业务功能在微服务的情况下可能对应不同的进程,不同的业务逻辑对于进程的资源要求也会不一样。如何我们随机分配进程到节点上,很容易就出现各个节点资源不平均,最终导致整个系统的不稳定。

如何进行调度

一般情况下,我们可能会考虑,根据资源的要求去判断节点资源是否满足,如果满足则指定为该节点。咋一看好像问题不大,那实际上pod的调度却要考虑到方方面面

  1. pod的资源依赖类型过多,CPU,内存,磁盘,网络带宽等,这些都需要我们考虑进去
  2. 如果只是简单的判断,也同样容易造成某个节点出现资源过载的情况,是否能够各个节点资源使用保持一个相对平衡的状态
  3. 当需要调度的pod数量和资源节点数过多的时候,我们是否能够进行一个高效正确的调度决策?

以上问题是我对调度的一个理解,那我们直接看看k8s是怎样去处理这个调度的。

调度流程

我们还是结合之前的文章,来看这个调度过程。controller-manager创建了deployment对应的pod资源,而这个pod实际上却不知道应该去哪个节点上。那kube-scheduler实际要做的可以看成三个步骤。

  1. 获取未调度podList
  2. 通过调度算法选择一个合适的node
  3. 选择完成后,通过apiserver更新数据

重点是第二个步骤,调度算法是如何选择一个合适的node呢?是否要将我们前面提到的方方面面考虑进去吗?

查询相关资料后,以下就是整个调度的一个过程:

  1. 预选阶段,过滤节点,调度器用一组规则过滤掉不符合要求的 Node 节点,比如 Pod 设置了资源的 request,那么可用资源比 Pod 需要的资源少的主机显然就会被过滤掉
  2. 优选阶段,为节点的优先级打分,将上一阶段过滤出来的 Node 列表进行打分,调度器会考虑一些整体的优化策略,比如把 Deployment 控制的多个 Pod 副本分布到不同的主机上,使用最低负载的主机等等策略
  3. 没有合适的node判断是否启动抢占机制
  4. 更新本地缓存,指定pod的nodename为调度计算出来的节点
  5. 执行reserve相关plugin
  6. 执行Permit相关插件
  7. 最后通过preBind,bind,以及postBind,完成一个bind步骤。

预选阶段,优选阶段和抢占机制是整个调度的重要实现逻辑,我们可以具体看看这几个机制具体是怎样的。

预选阶段

在预选阶段,我们将有一系列的条件来筛选出合适的调度节点。

GeneralPredicates

第一种类型,叫作 GeneralPredicates。顾名思义,这一组过滤规则,负责的是最基础的调度策略。比如,PodFitsResources 计算的就是宿主机的 CPU 和内存资源等是否够用

  • podFistResources
  • PodSelectorMatches
  • PodFitsHost
  • PodFitsPorts
  • PodFitsHostPorts

Volume相关

第二种类型,是与 Volume 相关的过滤规则。这一组过滤规则,负责的是跟容器持久化 Volume 相关的调度策略

  • NoDiskconflict,检查磁盘冲突
  • MaxPDVolumeCountPredicate,检查一个节点上某种类型的持久化Volume是否超过一定数目
  • VolumeZonePredicate,检查持久化Volume的Zone标签,是否与待考察节点的Zone标签想匹配。

宿主机相关

第三种类型,是宿主机相关的过滤规则。这一组规则,主要考察待调度 Pod 是否满足 Node 本身的某些条件。比如,PodToleratesNodeTaints,负责检查的就是我们前面经常用到的 Node 的“污点”机制。只有当 Pod 的 Toleration 字段与 Node 的 Taint 字段能够匹配的时候,这个 Pod 才能被调度到该节点上

其他

Predicates过滤有一系列的算法可以使用,上面就是简单的列举几个,还有很多,更多更详细的我们可以查看源码文件:k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go。

虽然Predicates是串行的,但是当开始调度一个 Pod 时,Kubernetes 调度器会同时启动 16 个 Goroutine,来并发地为集群里的所有 Node 计算 Predicates,最后返回可以运行这个 Pod 的宿主机列表。

优选阶段

在 Predicates 阶段完成了节点的“过滤”之后,Priorities 阶段的工作就是为这些节点打分。这里打分的范围是 0-10 分,得分最高的节点就是最后被 Pod 绑定的最佳节点。Priorities 里最常用到的一个打分规则,是 LeastRequestedPriority。

  • leastRequestedPriority,在选择空闲资源(CPU 和 Memory)最多的宿主机
  • CalculateNodeLabelPriority,配置node标签确定优先级,从而计算出得分数。
  • BalancedResourceAllocation,使得节点里各种资源分配最均衡。
  • SelectorSpreadPriority,为了更好的高可用,对同属于一个 Deployment 或者 RC 下面的多个 Pod 副本,尽量调度到多个不同的节点上,当一个 Pod 被调度的时候,会先去查找该 Pod 对应的 controller,然后查看该 controller 下面的已存在的 Pod,运行 Pod 越少的节点权重越高
  • ImageLocalityPriority,就是如果在某个节点上已经有要使用的镜像节点了,镜像总大小值越大,权重就越高
  • NodeAffinityPriority,这个就是根据节点的亲和性来计算一个权重值,后面我们会详细讲解亲和性的使用方法

抢占机制

当一个高优先级的 Pod 调度失败的时候,调度器的抢占能力就会被触发。这时,调度器就会试图从当前集群里寻找一个节点,使得当这个节点上的一个或者多个低优先级 Pod 被删除后,待调度的高优先级 Pod 就可以被调度到这个节点上。

最后一张图来看下调度的执行过程。

cff153e62aff83efcbfcc9e2e1a99c00.png

源码分析

前面讲述了一个调度的大体机制是怎样的,那真正的代码实现又是怎样的呢?我们可以先来看几张图,对整个调度有个大体的认识,方便我们后续进行源码分析。

图解

第一张图policy是调度策略的相关配置,infomer是获取各个资源的信息,并将其存入schedule Cache中,algorithm基于policy的配置,对于监听到的资源进行调度处理。

dd78dd561ccbe73914266070eacbc045.png

第二张图其实描述的更细节,里面提到了Enqueue,FIFO等队列名词,其实整个调度过程中,还实现了很多其他队列,我们可以看下图三,这里更多展现的是scheduler的一个数据流向图。大家可以细品。

scheduler.jpg

这里就体现了一个pod创建,在kube-scheduler中消息是如何流转的。

image-1606403958351.png

源码

informer在之前的controller-manager中已经分析过,这里不再分析。重点是分析下当infomer监听到数据后,scheduler的一系列实现是怎样的?

按照惯例,我们先看看kube-scheduler的启动过程。

// Kubernetes/cmd/kube-scheduler/app/server.go
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
	// To help debugging, immediately log version
	klog.V(1).InfoS("Starting Kubernetes Scheduler version", "version", version.Get())

	// Configz registration.
	if cz, err := configz.New("componentconfig"); err == nil {
		cz.Set(cc.ComponentConfig)
	} else {
		return fmt.Errorf("unable to register configz: %s", err)
	}

	// Prepare the event broadcaster.
	cc.EventBroadcaster.StartRecordingToSink(ctx.Done())

	// Setup healthz checks.
	var checks []healthz.HealthChecker
	if cc.ComponentConfig.LeaderElection.LeaderElect {
		checks = append(checks, cc.LeaderElection.WatchDog)
	}

	waitingForLeader := make(chan struct{})
	isLeader := func() bool {
		select {
		case _, ok := <-waitingForLeader:
			// if channel is closed, we are leading
			return !ok
		default:
			// channel is open, we are waiting for a leader
			return false
		}
	}

	// Start up the healthz server.
	if cc.InsecureServing != nil {
		separateMetrics := cc.InsecureMetricsServing != nil
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, separateMetrics, checks...), nil, nil)
		if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start healthz server: %v", err)
		}
	}
	if cc.InsecureMetricsServing != nil {
		handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader), nil, nil)
		if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start metrics server: %v", err)
		}
	}
	if cc.SecureServing != nil {
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
		// TODO: handle stoppedCh returned by c.SecureServing.Serve
		if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			// fail early for secure handlers, removing the old error loop from above
			return fmt.Errorf("failed to start secure server: %v", err)
		}
	}

	// Start all informers.
  // 启动所有相关informers
	cc.InformerFactory.Start(ctx.Done())

	// Wait for all caches to sync before scheduling.
	cc.InformerFactory.WaitForCacheSync(ctx.Done())

	// If leader election is enabled, runCommand via LeaderElector until done and exit.
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: func(ctx context.Context) {
				close(waitingForLeader)
				sched.Run(ctx)
			},
			OnStoppedLeading: func() {
				select {
				case <-ctx.Done():
					// We were asked to terminate. Exit 0.
					klog.Info("Requested to terminate. Exiting.")
					os.Exit(0)
				default:
					// We lost the lock.
					klog.Exitf("leaderelection lost")
				}
			},
		}
		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}

		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// Leader election is disabled, so runCommand inline until done.
	close(waitingForLeader)
  // 执行scheduler
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}
复制代码

上面代码中有两个核心的方法,一个是cc.InformerFactory.Start(ctx.Done()),一个是sched.Run(ctx)。informer的启动是scheduler能够监听相关资源,sched则是初始化默认的调度算法以及默认的调度器 GenericScheduler。在server.go中有段代码对其进行了配置。

// Setup creates a completed config and a scheduler based on the command args and options
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) {
	if errs := opts.Validate(); len(errs) > 0 {
		return nil, nil, utilerrors.NewAggregate(errs)
	}

	c, err := opts.Config()
	if err != nil {
		return nil, nil, err
	}

	// Get the completed config
  // cc实际上config的实例
	cc := c.Complete()

	outOfTreeRegistry := make(runtime.Registry)
	for _, option := range outOfTreeRegistryOptions {
		if err := option(outOfTreeRegistry); err != nil {
			return nil, nil, err
		}
	}

	recorderFactory := getRecorderFactory(&cc)
	completedProfiles := make([]kubeschedulerconfig.KubeSchedulerProfile, 0)
	// Create the scheduler.
  // 创建scheduler,里面就包含了InformerFactory的生成。
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		recorderFactory,
		ctx.Done(),
		scheduler.WithKubeConfig(cc.KubeConfig),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
		scheduler.WithParallelism(cc.ComponentConfig.Parallelism),
		scheduler.WithBuildFrameworkCapturer(func(profile kubeschedulerconfig.KubeSchedulerProfile) {
			// Profiles are processed during Framework instantiation to set default plugins and configurations. Capturing them for logging
			completedProfiles = append(completedProfiles, profile)
		}),
	)
	if err != nil {
		return nil, nil, err
	}
	if err := options.LogOrWriteConfig(opts.WriteConfigTo, &cc.ComponentConfig, completedProfiles); err != nil {
		return nil, nil, err
	}

	return &cc, sched, nil
}

复制代码

这里是scheduler生成新实例的实际方法。

// kubernetes/pkg/scheduler/scheduler.go
// New returns a Scheduler
func New(client clientset.Interface,
	informerFactory informers.SharedInformerFactory,
	recorderFactory profile.RecorderFactory,
	stopCh <-chan struct{},
	opts ...Option) (*Scheduler, error) {

	stopEverything := stopCh
	if stopEverything == nil {
		stopEverything = wait.NeverStop
	}
	// 获取默认的调度器选项
  //1. 设定一些默认的组件参数
  //2. 给定默认的algorithmSourceProvider,后续的调度全部依赖这里提供的算法
	options := defaultSchedulerOptions
	for _, opt := range opts {
		opt(&options)
	}
	// 初始化调度缓存
	schedulerCache := internalcache.New(30*time.Second, stopEverything)
	// registry是一个字典,里面存放了插件名与插件的工厂方法
  // 默认接近有30个插件
	registry := frameworkplugins.NewInTreeRegistry()
	if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
		return nil, err
	}

	snapshot := internalcache.NewEmptySnapshot()
	clusterEventMap := make(map[framework.ClusterEvent]sets.String)

	configurator := &Configurator{
		...
	}

	metrics.Register()

	var sched *Scheduler
	source := options.schedulerAlgorithmSource
	switch {
	case source.Provider != nil:
		// Create the config from a named algorithm provider.
		sc, err := configurator.createFromProvider(*source.Provider)
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
		}
		sched = sc
	case source.Policy != nil:
		// Create the config from a user specified policy source.
		policy := &schedulerapi.Policy{}
		switch {
		case source.Policy.File != nil:
			if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
				return nil, err
			}
		case source.Policy.ConfigMap != nil:
			if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
				return nil, err
			}
		}
		// Set extenders on the configurator now that we've decoded the policy
		// In this case, c.extenders should be nil since we're using a policy (and therefore not componentconfig,
		// which would have set extenders in the above instantiation of Configurator from CC options)
		configurator.extenders = policy.Extenders
		sc, err := configurator.createFromConfig(*policy)
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
		}
		sched = sc
	default:
		return nil, fmt.Errorf("unsupported algorithm source: %v", source)
	}
  ...
  // 指定pod,node,svc,pv等事件回调处理,pod queue也是这里维护
	addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
	return sched, nil
}
复制代码

将option转化为Configurator,然后指定调度算法源(预选、优选的算法),通过provider和config的方式。config方式最终会调用CreateFromKeys,通过指定key选择指定的算法。

  • AddEventHandlers:指定pod,node, svc, pv等的事件回调处理,pod queue也是这里维护
  • Run: 会一直调用scheduleOne方法,逐一的对没有bind的pod进行调度
// kubernetes/pkg/scheduler/scheduler.go
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
	sched.SchedulingQueue.Run()
  // 执行scheduler关键函数scheduleOne
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}
复制代码
// kubernetes/pkg/scheduler/scheduler.go
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting.
func (sched *Scheduler) scheduleOne(ctx context.Context) {
	podInfo := sched.NextPod()
	// pod could be nil when schedulerQueue is closed
	if podInfo == nil || podInfo.Pod == nil {
		return
	}
	pod := podInfo.Pod
	fwk, err := sched.frameworkForPod(pod)
	if err != nil {
		// This shouldn't happen, because we only accept for scheduling the pods
		// which specify a scheduler name that matches one of the profiles.
		klog.ErrorS(err, "Error occurred")
		return
	}
	if sched.skipPodSchedule(fwk, pod) {
		return
	}
	// Synchronously attempt to find a fit for the pod.
	...
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, fwk, state, pod)
  if err != nil {
		// Schedule() may have failed because the pod would not fit on any host, so we try to
		// preempt, with the expectation that the next time the pod is tried for scheduling it
		// will fit due to the preemption. It is also possible that a different pod will schedule
		// into the resources that were preempted, but this is harmless.
		nominatedNode := ""
		if fitError, ok := err.(*framework.FitError); ok {
			if !fwk.HasPostFilterPlugins() {
				klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")
			} else {
				// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.   
        // 开启抢占机制
				result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
			  ...
			}
      ...
		}
	...
	// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
	// This allows us to keep scheduling without waiting on binding to occur.
	assumedPodInfo := podInfo.DeepCopy()
	assumedPod := assumedPodInfo.Pod
	// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
	...
	// Run the Reserve method of reserve plugins.
	if sts := fwk.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
		...
	}

	// Run "permit" plugins.
	runPermitStatus := fwk.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
	...

	// bind the pod to its host asynchronously (we can do this b/c of the assumption step above).
	go func() {
		...

		// Run "prebind" plugins.
		preBindStatus := fwk.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		...
		err := sched.bind(bindingCycleCtx, fwk, assumedPod, scheduleResult.SuggestedHost, state)
		...
			// Run "postbind" plugins.
			fwk.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		}
	}()

复制代码
  • sched.NextPod(): 从维护的internalqueue取出pod
  • sched.Algorithm.Schedule: 开始调度,选出合适的node,封装在generic_scheduler.go中
  • fwk.RunPermitPlugins:其中会有抢占机制的插件,如果调度失败(当前没有适合的node调度),所以判断是否需要抢占调度,也封装在generic_scheduler.go中,抢占调度成功只有,会将牺牲(被抢占)的pods进行移除
  • sched.assume: 对于调度成功的pod做假设,给该pod的NodeName添加了调度的SuggestHost,写入到cache中,后续才是真正的bind,因为bind比较耗时,后面异步去做
  • sched.bind:使用协程,异步绑定pod到node上, bind方法比较简单,调用api server方法进行bind: b.Client.CoreV1().Pods(binding.Namespace).Bind(binding)
// kubernetes/pkg/scheduler/core/generic_scheduler.go
// Schedule tries to schedule the given pod to one of the nodes in the node list.
// If it succeeds, it will return the name of the node.
// If it fails, it will return a FitError error with reasons.
func (g *genericScheduler) Schedule(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
	defer trace.LogIfLong(100 * time.Millisecond)

	if err := g.snapshot(); err != nil {
		return result, err
	}
	trace.Step("Snapshotting scheduler cache and node infos done")

	if g.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}

	feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, fwk, state, pod)
	if err != nil {
		return result, err
	}
	trace.Step("Computing predicates done")

	if len(feasibleNodes) == 0 {
		return result, &framework.FitError{
			Pod:         pod,
			NumAllNodes: g.nodeInfoSnapshot.NumNodes(),
			Diagnosis:   diagnosis,
		}
	}

	// When only one node after predicate, just use it.
	if len(feasibleNodes) == 1 {
		return ScheduleResult{
			SuggestedHost:  feasibleNodes[0].Name,
			EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
			FeasibleNodes:  1,
		}, nil
	}

	priorityList, err := g.prioritizeNodes(ctx, fwk, state, pod, feasibleNodes)
	if err != nil {
		return result, err
	}

	host, err := g.selectHost(priorityList)
	trace.Step("Prioritizing done")

	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}
复制代码
  • g.findNodesThatFit: 预选算法,找到合适的nodes
  • g.prioritizeNodes: 如果预选算法只有一个node,则直接使用,立即return,如果有多个,则需要进行优选算法,优选算法会对每一个node进行打分
  • g.selectHost:从优选的结果中选出得分最高的,如果最高分有多个,则随机选取一个node
// kubernetes/pkg/scheduler/core/generic_scheduler.go
// Filters the nodes to find the ones that fit the pod based on the framework
// filter plugins and filter extenders.
func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
	diagnosis := framework.Diagnosis{
		NodeToStatusMap:      make(framework.NodeToStatusMap),
		UnschedulablePlugins: sets.NewString(),
	}

	// Run "prefilter" plugins.
	s := fwk.RunPreFilterPlugins(ctx, state, pod)
	allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
	if err != nil {
		return nil, diagnosis, err
	}
	if !s.IsSuccess() {
		if !s.IsUnschedulable() {
			return nil, diagnosis, s.AsError()
		}
		// All nodes will have the same status. Some non trivial refactoring is
		// needed to avoid this copy.
		for _, n := range allNodes {
			diagnosis.NodeToStatusMap[n.Node().Name] = s
		}
		// Status satisfying IsUnschedulable() gets injected into diagnosis.UnschedulablePlugins.
		diagnosis.UnschedulablePlugins.Insert(s.FailedPlugin())
		return nil, diagnosis, nil
	}

	// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
	// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
	if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {
		feasibleNodes, err := g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
		if err != nil {
			klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
		}
		// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
		if len(feasibleNodes) != 0 {
			return feasibleNodes, diagnosis, nil
		}
	}
	feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes)
	if err != nil {
		return nil, diagnosis, err
	}

	feasibleNodes, err = g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap)
	if err != nil {
		return nil, diagnosis, err
	}
	return feasibleNodes, diagnosis, nil
}
复制代码
  • fwk.RunPreFilterPlugins(ctx, state, pod) 先根据nodeAffinity来计算出节点范围。
  • g.nodeInfoSnapshot.NodeInfos().List() 通过缓存获取所有节点信息
  • g.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis) 计算各个节点的信息,找到合适的节点列表
  • g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, allNodes) 通过过滤插件过滤出合适的node,内部存在parallelize.NewErrorChannel()来提供并发执行能力,默认是16个协程一起执行。RunFilterPluginsWithNominatedPods来对各个节点进行筛选。xx
  • g.findNodesThatPassExtenders(pod, feasibleNodes, diagnosis.NodeToStatusMap) 提供扩展性的过滤条件,如果适合的node个数大于0 且 有extender,则会调用extender的filter方法,如果filter之后个数为0,则break返回
// kubernetes/pkg/scheduler/core/generic_scheduler.go
// prioritizeNodes prioritizes the nodes by running the score plugins,
// which return a score for each node from the call to RunScorePlugins().
// The scores from each plugin are added together to make the score for that node, then
// any extenders are run as well.
// All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes(
	ctx context.Context,
	fwk framework.Framework,
	state *framework.CycleState,
	pod *v1.Pod,
	nodes []*v1.Node,
) (framework.NodeScoreList, error) {
	// If no priority configs are provided, then all nodes will have a score of one.
	// This is required to generate the priority list in the required format
	if len(g.extenders) == 0 && !fwk.HasScorePlugins() {
		result := make(framework.NodeScoreList, 0, len(nodes))
		for i := range nodes {
			result = append(result, framework.NodeScore{
				Name:  nodes[i].Name,
				Score: 1,
			})
		}
		return result, nil
	}

	// Run PreScore plugins.
	preScoreStatus := fwk.RunPreScorePlugins(ctx, state, pod, nodes)
	if !preScoreStatus.IsSuccess() {
		return nil, preScoreStatus.AsError()
	}

	// Run the Score plugins.
	scoresMap, scoreStatus := fwk.RunScorePlugins(ctx, state, pod, nodes)
	if !scoreStatus.IsSuccess() {
		return nil, scoreStatus.AsError()
	}

	if klog.V(10).Enabled() {
		for plugin, nodeScoreList := range scoresMap {
			for _, nodeScore := range nodeScoreList {
				klog.InfoS("Plugin scored node for pod", "pod", klog.KObj(pod), "plugin", plugin, "node", nodeScore.Name, "score", nodeScore.Score)
			}
		}
	}

	// Summarize all scores.
	result := make(framework.NodeScoreList, 0, len(nodes))

	for i := range nodes {
		result = append(result, framework.NodeScore{Name: nodes[i].Name, Score: 0})
		for j := range scoresMap {
			result[i].Score += scoresMap[j][i].Score
		}
	}

	if len(g.extenders) != 0 && nodes != nil {
		var mu sync.Mutex
		var wg sync.WaitGroup
		combinedScores := make(map[string]int64, len(nodes))
		for i := range g.extenders {
			if !g.extenders[i].IsInterested(pod) {
				continue
			}
			wg.Add(1)
			go func(extIndex int) {
				metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Inc()
				defer func() {
					metrics.SchedulerGoroutines.WithLabelValues(metrics.PrioritizingExtender).Dec()
					wg.Done()
				}()
				prioritizedList, weight, err := g.extenders[extIndex].Prioritize(pod, nodes)
				if err != nil {
					// Prioritization errors from extender can be ignored, let k8s/other extenders determine the priorities
					return
				}
				mu.Lock()
				for i := range *prioritizedList {
					host, score := (*prioritizedList)[i].Host, (*prioritizedList)[i].Score
					if klog.V(10).Enabled() {
						klog.InfoS("Extender scored node for pod", "pod", klog.KObj(pod), "extender", g.extenders[extIndex].Name(), "node", host, "score", score)
					}
					combinedScores[host] += score * weight
				}
				mu.Unlock()
			}(i)
		}
		// wait for all go routines to finish
		wg.Wait()
		for i := range result {
			// MaxExtenderPriority may diverge from the max priority used in the scheduler and defined by MaxNodeScore,
			// therefore we need to scale the score returned by extenders to the score range used by the scheduler.
			result[i].Score += combinedScores[result[i].Name] * (framework.MaxNodeScore / extenderv1.MaxExtenderPriority)
		}
	}

	if klog.V(10).Enabled() {
		for i := range result {
			klog.InfoS("Calculated node's final score for pod", "pod", klog.KObj(pod), "node", result[i].Name, "score", result[i].Score)
		}
	}
	return result, nil
}

// kubernetes/pkg/scheduler/core/generic_scheduler.go
// selectHost takes a prioritized list of nodes and then picks one
// in a reservoir sampling manner from the nodes that had the highest score.
func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (string, error) {
	if len(nodeScoreList) == 0 {
		return "", fmt.Errorf("empty priorityList")
	}
	maxScore := nodeScoreList[0].Score
	selected := nodeScoreList[0].Name
	cntOfMaxScore := 1
	for _, ns := range nodeScoreList[1:] {
		if ns.Score > maxScore {
			maxScore = ns.Score
			selected = ns.Name
			cntOfMaxScore = 1
		} else if ns.Score == maxScore {
			cntOfMaxScore++
			if rand.Intn(cntOfMaxScore) == 0 {
				// Replace the candidate with probability of 1/cntOfMaxScore
				selected = ns.Name
			}
		}
	}
	return selected, nil
}
复制代码

整个计算过程也会分为:

  1. 预备计分阶段
  2. 执行计分插件方法
  3. 聚合所有计分方式
// kubernetes/pkg/scheduler/framework/runtime/framework.go
func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl framework.PostFilterPlugin, state *framework.CycleState, pod *v1.Pod, filteredNodeStatusMap framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
	if !state.ShouldRecordPluginMetrics() {
		return pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
	}
	startTime := time.Now()
	r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
	f.metricsRecorder.observePluginDurationAsync(postFilter, pl.Name(), s, metrics.SinceInSeconds(startTime))
	return r, s
}

// kubernetes/pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go
// PostFilter invoked at the postFilter extension point.
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
	defer func() {
		metrics.PreemptionAttempts.Inc()
	}()

	nnn, status := pl.preempt(ctx, state, pod, m)
	if !status.IsSuccess() {
		return nil, status
	}
	// This happens when the pod is not eligible for preemption or extenders filtered all candidates.
	if nnn == "" {
		return nil, framework.NewStatus(framework.Unschedulable)
	}
	return &framework.PostFilterResult{NominatedNodeName: nnn}, framework.NewStatus(framework.Success)
}

// preempt finds nodes with pods that can be preempted to make room for "pod" to
// schedule. It chooses one of the nodes and preempts the pods on the node and
// returns 1) the node name which is picked up for preemption, 2) any possible error.
// preempt does not update its snapshot. It uses the same snapshot used in the
// scheduling cycle. This is to avoid a scenario where preempt finds feasible
// nodes without preempting any pod. When there are many pending pods in the
// scheduling queue a nominated pod will go back to the queue and behind
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (pl *DefaultPreemption) preempt(ctx context.Context, state *framework.CycleState, pod *v1.Pod, m framework.NodeToStatusMap) (string, *framework.Status) {
	cs := pl.fh.ClientSet()
	nodeLister := pl.fh.SnapshotSharedLister().NodeInfos()

	// 0) Fetch the latest version of <pod>.
	// It's safe to directly fetch pod here. Because the informer cache has already been
	// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().
	// However, tests may need to manually initialize the shared pod informer.
	podNamespace, podName := pod.Namespace, pod.Name
	pod, err := pl.podLister.Pods(pod.Namespace).Get(pod.Name)
	if err != nil {
		klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))
		return "", framework.AsStatus(err)
	}

	// 1) Ensure the preemptor is eligible to preempt other pods.
	if !PodEligibleToPreemptOthers(pod, nodeLister, m[pod.Status.NominatedNodeName]) {
		klog.V(5).InfoS("Pod is not eligible for more preemption", "pod", klog.KObj(pod))
		return "", nil
	}

	// 2) Find all preemption candidates.
	candidates, nodeToStatusMap, status := pl.FindCandidates(ctx, state, pod, m)
	if !status.IsSuccess() {
		return "", status
	}

	// Return a FitError only when there are no candidates that fit the pod.
	if len(candidates) == 0 {
		fitError := &framework.FitError{
			Pod:         pod,
			NumAllNodes: len(nodeToStatusMap),
			Diagnosis: framework.Diagnosis{
				NodeToStatusMap: nodeToStatusMap,
				// Leave FailedPlugins as nil as it won't be used on moving Pods.
			},
		}
		return "", framework.NewStatus(framework.Unschedulable, fitError.Error())
	}

	// 3) Interact with registered Extenders to filter out some candidates if needed.
	candidates, status = CallExtenders(pl.fh.Extenders(), pod, nodeLister, candidates)
	if !status.IsSuccess() {
		return "", status
	}

	// 4) Find the best candidate.
	bestCandidate := SelectCandidate(candidates)
	if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
		return "", nil
	}

	// 5) Perform preparation work before nominating the selected candidate.
	if status := PrepareCandidate(bestCandidate, pl.fh, cs, pod, pl.Name()); !status.IsSuccess() {
		return "", status
	}

	return bestCandidate.Name(), nil
}
复制代码

通过查看文档对比kubernetes的实际代码,发现源码的实现发生了很大改变。抢占机制使用了插件的模式来实现,抢占机制的实现就是其中一个插件。该插件主要实现的几个关键函数分析如下:

  1. PodEligibleToPreemptOthers,确保pod有资格抢占其他pod
  2. FindCandidates,找到所有抢占候选人
  3. CallExtenders,如果需要,与注册的扩展程序交互以过滤掉一些候选人
  4. SelectCandidate,找到一个最佳候选人
  5. PrepareCandidate,在提名选定的候选人之前进行准备工作

大致过程就是这样,考虑到抢占机制这块内容也比较多,以后单独来分析,在这里我就不再多去深入分析了。最后我们看看如果调度成功后,进行资源绑定的环节。

// kubernetes/pkg/scheduler/scheduler.go
func (sched *Scheduler) bind(ctx context.Context, fwk framework.Framework, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
	defer func() {
		sched.finishBinding(fwk, assumed, targetNode, err)
	}()

	bound, err := sched.extendersBinding(assumed, targetNode)
	if bound {
		return err
	}
  // 这里的插件就最终实现绑定,绑定方法在kubernetes/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go中
	bindStatus := fwk.RunBindPlugins(ctx, state, assumed, targetNode)
	if bindStatus.IsSuccess() {
		return nil
	}
	if bindStatus.Code() == framework.Error {
		return bindStatus.AsError()
	}
	return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
}

// kubernetes/pkg/scheduler/framework/plugins/defaultbinder/default_binder.go
// Bind binds pods to nodes using the k8s client.
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
	klog.V(3).InfoS("Attempting to bind pod to node", "pod", klog.KObj(p), "node", nodeName)
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
		Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
	}
	err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return framework.AsStatus(err)
	}
	return nil
}
复制代码

这里的绑定我们可以看到会先执行一个extendersBinding,extendersBinding主要作用是通过配置里获取相关extender的配置,如果存在则进行一些回调处理,没有则跳过。另外extendersBinding这里的方法社区也将计划迁移到plugin中。这里的然后再利用绑定插件实现绑定逻辑,实际上是调用apiserver接口,对数据进行一个变更。

小结

基于开篇的三个问题,对kube-scheduler的原理做一个总结。

  1. kubernetes的调度解决了分布式系统中,资源合理分布的问题
  2. kube-scheduler的调度主要经历获取信息,调度,更新绑定信息,其中调度里面预选机制,优选机制,抢占机制尤为重要
  3. kube-scheduler的源码中发现如何几点
    1. 抢占机制改用了插件方式去实现,这样增加了调度功能的扩展性和可维护性
    2. 整个数据处理实现的过程中,多次使用到了queue来去解耦,是各个功能相对比较独立

结束语

kubernetes庖丁解牛系列中,kube-scheduler是kubernetes的核心模块,里面的一些调度算法我们还可以深入学习,另外其在数据处理上的实现方式也是值得我们深入学习的。文章中必然会有一些不严谨的地方,还希望大家包涵,大家吸取精华(如果有的话),去其糟粕。如果大家感兴趣可以关我的公众号:gungunxi。我的微信号:lcomedy2021

参考文章

kube-scheduler源码分析:www.bookstack.cn/read/source…

cloud.tencent.com/developer/a…

kubernetes官网:kubernetes.io/zh/docs/con…

Kube-scheduler: www.infoq.cn/article/or7…

调度:kingjcy.github.io/post/cloud/…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享