02. kube-apiserver接受创建deploy资源请求的源码详解

概述

我们在学习kubernetes除了要了解其相关概念完,我们还需要深入了解整个kubernetes的实现机制是如何,如果还能了解其源码,那基本上我们才算是对kubernetes很熟悉吧。我将用kubernetes是如何生成一个deployment的资源,并且附带源码解读的方式讲解kubernetes的整个实现机制。

之前的文章

这篇文章将讲解一个kubectl执行完命令后,发送到apiserver。apiserver将如何进行处理呢。

apiserver组件说明

kube-apiserver作为整个Kubernetes集群操作etcd的唯一入口,负责Kubernetes各资源的认证&鉴权,校验以及CRUD等操作,提供RESTful APIs,供其它组件调用。调用拓扑如下:

apiserver02.png

接口大致组成,如下图:

apiserver.png
apiserver的一个deployment资源接口各字段代表的含义,如下图:

images.png

处理流程

我们弄清楚apiserver的调用过程之前,还是需要对apiserver的启动流程有个简单了解。

启动流程

  1. 完成参数配置
  2. 判断配置是否合法
  3. 执行最终的Run方法。本质上是配置路由,访问权限以及同数据库(etcd)的交互等

Run方法的主要逻辑为:

  1. 调用 CreateServerChain 构建服务调用链并判断是否启动非安全的 http server,http server 链中包含 apiserver 要启动的三个 server,以及为每个 server 注册对应资源的路由;

  2. 调用 server.PrepareRun 进行服务运行前的准备,该方法主要完成了健康检查、存活检查和OpenAPI路由的注册工作;

  3. 调用 prepared.Run 启动 https server;

处理请求

这里有一个请求完整流程图:

4208d668bc29366da6eb89e2952b192a-3249995.png

  1. 请求访问控制,认证,鉴权,准入控制等
  2. 路由分发
  3. 数据库操作

数据结构

  • Hanler,实际web处理请求的对象
  • Route,实际路由分发的逻辑对象
  • webservice,Route的集合
  • Container,webservice的集合
  • RESTStorage,将reset请求转换成存储方法的抽象

依赖模块

apiserver的很多方法要依赖k8s.io/apiserver/pkg/apis/apiserver模块。

核心web-server的模块需要依赖go-restfule模块。

GenericAPIServer

go-restful组件

Route

路由包含两种,一种是标准JSR311接口规范的实现RouterJSR311,一种是快速路由CurlyRouter。CurlyRouter支持正则表达式和动态参数,相比RouterJSR311更加轻量级,apiserver中使用的就是这种路由。
一条Route的设定包含:请求方法(Http Method),请求路径(URL Path),处理方法以及可选的接受内容类型(Content-Type),响应内容类型(Accept)等。

WebService

WebService逻辑上是Route的集合,功能上主要是为一组Route统一设置包括root path,请求响应的数据类型等一些通用的属性。需要注意的是,WebService必须加入到Container中才能生效。

Container

Container逻辑上是WebService的集合,功能上可以实现多终端的效果。例如,下面代码中创建了两个Container,分别在不同的port上提供服务。

Filter

Filter用于动态的拦截请求和响应,类似于放置在相应组件前的钩子,在相应组件功能运行前捕获请求或者响应,主要用于记录log,验证,重定向等功能。go-restful中有三种类型的Filter

主要有三种类型的Filter

  • Container Filter
  • WebService Filter
  • Route Filter

源码分析

先简单了解下apiserver的启动流程的源码分析。

启动流程

kubernetes.cmd.apiserver.apiserver.go 为主函数,整个调用链路如下:

app.NewAPIServerCommand –> Run –> CreateServerChain

这里关键是构造一个server。

func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
  ...
  // 初始化kubeapiserver的配置
  kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
	...
	// 负责CustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相应CustomResource(CR)的REST请求
	apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())
	if err != nil {
		return nil, err
	}
  // 核心接口服务的管理,负责对请求的一些通用处理
	kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)
	if err != nil {
		return nil, err
	}
	...
  // 负责处理 `apiregistration.k8s.io` 组下的APIService资源请求,同时将来自用户的请求拦截转发给aggregated server(AA)
	aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
	if err != nil {
		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
		return nil, err
	}

	return aggregatorServer, nil
}

复制代码

kube-apiserver包含三种APIServer:

  • aggregatorServer:负责处理 apiregistration.k8s.io 组下的APIService资源请求,同时将来自用户的请求拦截转发给aggregated server(AA)。同时还将触发如下几个controlller服务:

    • apiserviceRegistrationController:负责 APIServices 中资源的注册与删除;
    • availableConditionController:维护 APIServices 的可用状态,包括其引用 Service 是否可用等;
    • autoRegistrationController:用于保持 API 中存在的一组特定的 APIServices;
    • crdRegistrationController:负责将 CRD GroupVersions 自动注册到 APIServices 中;
    • openAPIAggregationController:将 APIServices 资源的变化同步至提供的 OpenAPI 文档;
  • kubeAPIServer:负责对请求的一些通用处理,包括:认证、鉴权以及各个内建资源(pod, deployment,service and etc)的REST服务等

  • apiExtensionsServer:负责CustomResourceDefinition(CRD)apiResources以及apiVersions的注册,同时处理CRD以及相应CustomResource(CR)的REST请求(如果对应CR不能被处理的话则会返回404),也是apiserver Delegation的最后一环

    那当我们执行kubectl apply -f deploy-nginx.yaml时,请求的资源对象是deployment,这个请求会经过哪个Server进行处理呢?我们继续往下看。

请求处理

ApiServer的代码相对比较复杂,我将重点梳理请求处理流程的整个链路,整个链路按照访问控制路由分发数据处理三个阶段进行分析。一般情况下我们去分析web服务的时候,基本套路是container –> webservice –> route —> handler。每个请求处理核心逻辑都在handler上。基于这个思路,我们去看看源码是怎么实现的。

访问控制

使用apiserver经过处理之前,先会经过访问控制,进行一系列的过滤操作。

apiServer中与权限相关的主要有三种机制,即常用的认证、鉴权和准入控制。对apiserver来说,主要提供的就是rest风格的接口,所以各种权限最终还是集中到对接口的权限判断上。
以最核心的kubeAPIServerConfig举例,在CreateServerChain方法中,调用了CreateKubeAPIServerConfig的方法,该方法主要的作用是创建kubeAPIServer的配置。进入该方法,调用了buildGenericConfig创建一些通用的配置,在NewConfig下,返回了DefaultBuildHandlerChain,该方法主要就是用来对apiserver rest接口的链式判断,即俗称的filter操作。

整个调用路径为createAggregatorConfig–>genericapiserver.BuildHandlerChainWithStorageVersionPrecondition–>DefaultBuildHandlerChain

// 这个是请求访问控制的核心方法
func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
	handler := filterlatency.TrackCompleted(apiHandler)
	handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "authorization")

	if c.FlowControl != nil {
		handler = filterlatency.TrackCompleted(handler)
		handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl)
		handler = filterlatency.TrackStarted(handler, "priorityandfairness")
	} else {
		handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
	}

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
	handler = filterlatency.TrackStarted(handler, "impersonation")

	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
	handler = filterlatency.TrackStarted(handler, "audit")

	failedHandler := genericapifilters.Unauthorized(c.Serializer)
	failedHandler = genericapifilters.WithFailedAuthenticationAudit(failedHandler, c.AuditBackend, c.AuditPolicyChecker)

	failedHandler = filterlatency.TrackCompleted(failedHandler)
	handler = filterlatency.TrackCompleted(handler)
	handler = genericapifilters.WithAuthentication(handler, c.Authentication.Authenticator, failedHandler, c.Authentication.APIAudiences)
	handler = filterlatency.TrackStarted(handler, "authentication")

	handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")

	// WithTimeoutForNonLongRunningRequests will call the rest of the request handling in a go-routine with the
	// context with deadline. The go-routine can keep running, while the timeout logic will return a timeout to the client.
	handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)

	handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyChecker,
		c.LongRunningFunc, c.Serializer, c.RequestTimeout)
	handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
	handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
	if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
		handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
	}
	handler = genericapifilters.WithAuditAnnotations(handler, c.AuditBackend, c.AuditPolicyChecker)
	handler = genericapifilters.WithWarningRecorder(handler)
	handler = genericapifilters.WithCacheControl(handler)
	handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
	handler = genericfilters.WithHTTPLogging(handler)
	handler = genericapifilters.WithRequestReceivedTimestamp(handler)
	handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)
	handler = genericapifilters.WithAuditID(handler)
	return handler
}
复制代码
RBAC

Kubernetes中比较重要的用的比较多的可能就是RBAC了。在DefaultBuildHandlerChain方法内,通过调用genericapifilters.WithAuthorization方法,实现对每个接口的权限的filter操作。WithAuthorization方法如下

// WithAuthorizationCheck passes all authorized requests on to handler, and returns a forbidden error otherwise.
func WithAuthorization(handler http.Handler, a authorizer.Authorizer, s runtime.NegotiatedSerializer) http.Handler {
   if a == nil {
      klog.Warning("Authorization is disabled")
      return handler
   }
   return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
      ctx := req.Context()
      ae := request.AuditEventFrom(ctx)

      attributes, err := GetAuthorizerAttributes(ctx)
      if err != nil {
         responsewriters.InternalError(w, req, err)
         return
      }
      authorized, reason, err := a.Authorize(ctx, attributes)
      // an authorizer like RBAC could encounter evaluation errors and still allow the request, so authorizer decision is checked before error here.
      if authorized == authorizer.DecisionAllow {
         audit.LogAnnotation(ae, decisionAnnotationKey, decisionAllow)
         audit.LogAnnotation(ae, reasonAnnotationKey, reason)
         handler.ServeHTTP(w, req)
         return
      }
      if err != nil {
         audit.LogAnnotation(ae, reasonAnnotationKey, reasonError)
         responsewriters.InternalError(w, req, err)
         return
      }

      klog.V(4).InfoS("Forbidden", "URI", req.RequestURI, "Reason", reason)
      audit.LogAnnotation(ae, decisionAnnotationKey, decisionForbid)
      audit.LogAnnotation(ae, reasonAnnotationKey, reason)
      responsewriters.Forbidden(ctx, attributes, w, req, reason, s)
   })
}
复制代码
  1. 调用GetAuthorizerAttributes方法获取配置的各种属性值
  2. 调用Authorize方法判断权限是否通过,不同的权限实现其接口,完成鉴权任务;
  3. 如果鉴权成功通过,则调用handler.ServeHTTP方法继续下一步的filter操作;否则,直接返回错误信息。

以RBAC为例,Authorize方法最终调用VisitRulesFor方法实现权限的判断,方法在kubernetes/pkg/registry/rbac/validation/rule.go文件内。

路由分发

apiserver如果进行路由分发,本质上是弄清楚apiserver是如何维护route到handler之间的映射关系的。基于此,我们可以换个角度看下,先看看路由是如何注册的。基于前面的server了解,猜测deployment的资源应该是由KubeApiServer进行处理的。我们继续分析下这个Server的路由是如何注册上去的。整个函数的调用链如下:

CreateKubeAPIServer --> kubeAPIServerConfig.Complete().New(delegateAPIServer) -->  m.InstallAPIs --> m.GenericAPIServer.InstallAPIGroups -->  s.installAPIResources --> apiGroupVersion.InstallREST --> installer.Install --> a.registerResourceHandlers
复制代码
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Instance, error) {
	...
	// install legacy rest storage
	if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
		legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{
			StorageFactory:              c.ExtraConfig.StorageFactory,
			ProxyTransport:              c.ExtraConfig.ProxyTransport,
			KubeletClientConfig:         c.ExtraConfig.KubeletClientConfig,
			EventTTL:                    c.ExtraConfig.EventTTL,
			ServiceIPRange:              c.ExtraConfig.ServiceIPRange,
			SecondaryServiceIPRange:     c.ExtraConfig.SecondaryServiceIPRange,
			ServiceNodePortRange:        c.ExtraConfig.ServiceNodePortRange,
			LoopbackClientConfig:        c.GenericConfig.LoopbackClientConfig,
			ServiceAccountIssuer:        c.ExtraConfig.ServiceAccountIssuer,
			ExtendExpiration:            c.ExtraConfig.ExtendExpiration,
			ServiceAccountMaxExpiration: c.ExtraConfig.ServiceAccountMaxExpiration,
			APIAudiences:                c.GenericConfig.Authentication.APIAudiences,
		}
    // 核心 API Resources添加到路由中,在apiserver中即是以 /api 开头的 resource
		if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
			return nil, err
		}
	}
  ...
  // 扩展的 API Resources添加到路由中,在apiserver中即是以 /apis 开头的 resource;
	if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
		return nil, err
	}
  ...
}
复制代码

根据我们前面了解到,deploymen的请求是以/apis开头的,我们主要看下InstallAPIs,如下:

// InstallAPIs will install the APIs for the restStorageProviders if they are enabled.
func (m *Instance) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
	...
	for _, restStorageBuilder := range restStorageProviders {
		...
		apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
		...
		apiGroupsInfo = append(apiGroupsInfo, &apiGroupInfo)
	}
  // 通过apiGroupsInfo来构造服务GenericAPIServer配置
	if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
		return fmt.Errorf("error in registering group versions: %v", err)
	}
	return nil
}
复制代码

继续查看下InstallAPIGroups方法

// Exposes given api groups in the API.
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
   ...
   for _, apiGroupInfo := range apiGroupInfos {
   // 关键函数,构造container
   	if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
   		return fmt.Errorf("unable to install api resources: %v", err)
   	}
   	// setup discovery
   	// Install the version handler.
   	// Add a handler at /apis/<groupName> to enumerate all versions supported by this group.
   	apiVersionsForDiscovery := []metav1.GroupVersionForDiscovery{}
   	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
   		// Check the config to make sure that we elide versions that don't have any resources
   		if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
   			continue
   		}
   		apiVersionsForDiscovery = append(apiVersionsForDiscovery, metav1.GroupVersionForDiscovery{
   			GroupVersion: groupVersion.String(),
   			Version:      groupVersion.Version,
   		})
   	}
   	preferredVersionForDiscovery := metav1.GroupVersionForDiscovery{
   		GroupVersion: apiGroupInfo.PrioritizedVersions[0].String(),
   		Version:      apiGroupInfo.PrioritizedVersions[0].Version,
   	}
   	apiGroup := metav1.APIGroup{
   		Name:             apiGroupInfo.PrioritizedVersions[0].Group,
   		Versions:         apiVersionsForDiscovery,
   		PreferredVersion: preferredVersionForDiscovery,
   	}
   	s.DiscoveryGroupManager.AddGroup(apiGroup)
   // 将webservice配置加入到GenericAPIServer中
   	s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
   }
   return nil
}
复制代码
/ installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
	var resourceInfos []*storageversion.ResourceInfo
	for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
	  ...
    // 这里将会再一次构造
		r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
		if err != nil {
			return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
		}
		resourceInfos = append(resourceInfos, r...)
	}
	...
	return nil
}
复制代码
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
	prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
	installer := &APIInstaller{
		group:             g,
		prefix:            prefix,
		minRequestTimeout: g.MinRequestTimeout,
	}
	// 这里会生成生成api资源
	apiResources, resourceInfos, ws, registrationErrors := installer.Install()
	versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
	versionDiscoveryHandler.AddToWebService(ws)
  // 将资源添加到container中
	container.Add(ws)
	return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
复制代码
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
	var apiResources []metav1.APIResource
	var resourceInfos []*storageversion.ResourceInfo
	var errors []error
  // 生成一个webservice
	ws := a.newWebService()
	// Register the paths in a deterministic (sorted) order to get a deterministic swagger spec.
	paths := make([]string, len(a.group.Storage))
	var i int = 0
	for path := range a.group.Storage {
		paths[i] = path
		i++
	}
	sort.Strings(paths)
	for _, path := range paths {
    // 将path注册到webservice中
		apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
		if err != nil {
			errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
		}
		if apiResource != nil {
			apiResources = append(apiResources, *apiResource)
		}
		if resourceInfo != nil {
			resourceInfos = append(resourceInfos, resourceInfo)
		}
	}
	return apiResources, resourceInfos, ws, errors
}
复制代码

由于registerResourceHandlers过于复杂,我们仅仅只看post请求资源deployment的情况。

代码地址:k8s.io/kubernetes/pkg/endpoints/installer.go

		case "POST": // Create a resource.
			var handler restful.RouteFunction
			if isNamedCreater {
        // 这里将最终构造一个handler,admit是准入控制,这个最终会进入createHandle函数
				handler = restfulCreateNamedResource(namedCreater, reqScope, admit)
			} else {
				handler = restfulCreateResource(creater, reqScope, admit)
			}
			handler = metrics.InstrumentRouteFunc(action.Verb, group, version, resource, subresource, requestScope, metrics.APIServerComponent, deprecated, removedRelease, handler)
			handler = utilwarning.AddWarningsHandler(handler, warnings)
			article := GetArticleForNoun(kind, " ")
			doc := "create" + article + kind
			if isSubresource {
				doc = "create " + subresource + " of" + article + kind
			}
      // 这里构造route,就与我们之前的分析相互呼应了
			route := ws.POST(action.Path).To(handler).
				Doc(doc).
				Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
				Operation("create"+namespaced+kind+strings.Title(subresource)+operationSuffix).
				Produces(append(storageMeta.ProducesMIMETypes(action.Verb), mediaTypes...)...).
				Returns(http.StatusOK, "OK", producedObject).
				// TODO: in some cases, the API may return a v1.Status instead of the versioned object
				// but currently go-restful can't handle multiple different objects being returned.
				Returns(http.StatusCreated, "Created", producedObject).
				Returns(http.StatusAccepted, "Accepted", producedObject).
				Reads(defaultVersionedObject).
				Writes(producedObject)
			if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
				return nil, nil, err
			}
			addParams(route, action.Params)
			routes = append(routes, route)
复制代码

通过上述c.GenericConfig.New以及installer.Install就构成了一个请求链路,当一个请求过来时整个apiserver的请求链路如下:

filters(DefaultBuildHandlerChain) => installAPI(/|/metrics|/debug|/version) | GenericAPIServer.installAPIResources(/api/v1) => APIGroupVersion.InstallREST(/api/v1/namespaces/{namespace}/pods/{name})
复制代码

整个构造过程从代码的调用链角度来看的话,显示要构造一个GenericAPIServer服务实例,将基于这个GenericAPIServer的信息逐步构造container,webservice,route。而path的信息是由RESTStorageProvider转换过来,handler则是restfulCreateNamedResource函数构造出来,最终会存在GenericAPIServer的handler实例中

数据处理

我们都清楚ApiServer与数据库的交互一般指的是与etcd的交互。Kubernetes所有的组件不直接与etcd交互,都是通过请求apiserver,apiserver与etcd进行交互完成数据的最终落盘。
在之前的路由实现已经说过,apiserver最终实现的handler对应的后端数据是以Store的结构保存的。这里如果是post一个deployment,那么这个请求的路由数据怎么生成的。我们在前面InstallAPIs方法的代码中其实可以看到一个NewRESTStorag方法。这个就是用来创建Storage。而这些storage的提供者都是RESTStorageProvider。deployment的storage是pkg/registry/apps/deployment/storage/storage.go

而deployment则是storagerest.RESTStorageProvider{}所提供。

pkd/registry/apps/rest/storage_apps.go

func (p StorageProvider) NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error) {
	apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apps.GroupName, legacyscheme.Scheme, legacyscheme.ParameterCodec, legacyscheme.Codecs)
	// If you add a version here, be sure to add an entry in `k8s.io/kubernetes/cmd/kube-apiserver/app/aggregator.go with specific priorities.
	// TODO refactor the plumbing to provide the information in the APIGroupInfo

	if apiResourceConfigSource.VersionEnabled(appsapiv1.SchemeGroupVersion) {
    // 生成storagemap,这里就包含生成deployment,daemonset等
		storageMap, err := p.v1Storage(apiResourceConfigSource, restOptionsGetter)
		if err != nil {
			return genericapiserver.APIGroupInfo{}, false, err
		}
		apiGroupInfo.VersionedResourcesStorageMap[appsapiv1.SchemeGroupVersion.Version] = storageMap
	}

	return apiGroupInfo, true, nil
}
复制代码
func (p StorageProvider) v1Storage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (map[string]rest.Storage, error) {
	storage := map[string]rest.Storage{}

	// deployments
  // 终于找到deployment的资源了
	deploymentStorage, err := deploymentstore.NewStorage(restOptionsGetter)
	if err != nil {
		return storage, err
	}
	storage["deployments"] = deploymentStorage.Deployment
	storage["deployments/status"] = deploymentStorage.Status
	storage["deployments/scale"] = deploymentStorage.Scale

	...
	return storage, nil
}
复制代码
func NewREST(optsGetter generic.RESTOptionsGetter) (*REST, *StatusREST, *RollbackREST, error) {
	store := &genericregistry.Store{
		NewFunc:                  func() runtime.Object { return &apps.Deployment{} },
		NewListFunc:              func() runtime.Object { return &apps.DeploymentList{} },
		DefaultQualifiedResource: apps.Resource("deployments"),

		CreateStrategy:      deployment.Strategy,
		UpdateStrategy:      deployment.Strategy,
		DeleteStrategy:      deployment.Strategy,
		ResetFieldsStrategy: deployment.Strategy,

		TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
	}
	options := &generic.StoreOptions{RESTOptions: optsGetter}
  // 所有信息将会存下来
	if err := store.CompleteWithOptions(options); err != nil {
		return nil, nil, nil, err
	}

	statusStore := *store
	statusStore.UpdateStrategy = deployment.StatusStrategy
	statusStore.ResetFieldsStrategy = deployment.StatusStrategy
	return &REST{store, []string{"all"}}, &StatusREST{store: &statusStore}, &RollbackREST{store: store}, nil
}

复制代码

store.CompleteWithOptions(options)最终会调用options.RESTOptions.GetRESTOptions初始化配置。options.RESTOptions 是一个 interface,想要找到其 GetRESTOptions 方法的实现必须知道 options.RESTOptions 初始化时对应的实例,其初始化是在 CreateKubeAPIServerConfig --> buildGenericConfig --> s.Etcd.ApplyWithStorageFactoryTo 方法中进行初始化的,RESTOptions 对应的实例为 StorageFactoryRestOptionsFactory。到这里我们再梳理下,一个是RestStorage是一个apiserver与数据库交付的一个抽象。另外我们默认情况下我们使用的是etcd数据库。

RESTStorage

kubeAPIServer会为每种API资源创建对应的RESTStorage,RESTStorage的目的是将每种资源的访问路径及其后端存储的操作对应起来:通过构造的REST Storage实现的接口判断该资源可以执行哪些操作(如:create、update等),将其对应的操作存入到action中,每一个操作对应一个标准的REST method,如create对应REST method为POST,而update对应REST method为PUT。最终根据actions数组依次遍历,对每一个操作添加一个handler(handler对应REST Storage实现的相关接口),并注册到route,最终对外提供RESTful API.

etcd 的交互细节

k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server/options/etcd.go:253

  • 读取请求内容:body, err := limitedReadBody(req, scope.MaxRequestBodyBytes)
  • 对内容进行decode:obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
  • 对decode后的body obj进行admit操作
  • 执行requestFunc,也即r.Create函数,也即Store.Create,该函数会对obj进行有效性检查
  • 最后在Store.Create中调用e.Storage.Create函数执行e.Storage.Create操作

代码模块整理

  • apiserver整体启动逻辑
    • k8s.io/kubernetes/cmd/kube-apiserver
  • apiserver bootstrap-controller创建&运行逻辑
    • k8s.io/kubernetes/pkg/master
  • API Resource对应后端RESTStorage(based on genericregistry.Store)创建
    • k8s.io/kubernetes/pkg/registry
  • aggregated-apiserver创建&处理逻辑
    • k8s.io/kubernetes/staging/src/k8s.io/kube-aggregator
  • extensions-apiserver创建&处理逻辑
    • k8s.io/kubernetes/staging/src/k8s.io/apiextensions-apiserver
  • apiserver创建&运行
    • k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/server
  • 注册API Resource资源处
    • handler(InstallREST&Install&registerResourceHandlers)
    • k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/endpoints
  • deployment,deamonset,statefulset相关资源的storage实现
    • pkd/registry/apps/rest/storage_apps.go
  • 创建存储后端(etcdv3)
    • k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/storage
  • genericregistry.Store.CompleteWithOptions初始化
    • k8s.io/kubernetes/staging/src/k8s.io/apiserver/pkg/registry

小结

启动流程中 RESTFul API 的实现过程:

​ kube-apiserver 中包含三个 server,分别为 KubeAPIServer、APIExtensionsServer 以及 AggregatorServer,三个 server 是通过委托模式连接在一起的,初始化过程都是类似的,首先为每个 server 创建对应的 config,然后初始化 http server,http server 的初始化过程为首先初始化 GoRestfulContainer,然后安装 server 所包含的 API,安装 API 时首先为每个 API Resource 创建对应的后端存储 RESTStorage,再为每个 API Resource 支持的 verbs 添加对应的 handler,并将 handler 注册到 route 中,最后将 route 注册到 webservice 中。

​ Kubectl apply -f nginx-deploy.yaml的请求处理到这里,apiserver只是将请求处理后存储到etcd中,离最终要达到创建对应的容器还有一段距离。那存入etcd后的数据,kubernetes将会作何处理了,敬请期待下篇文章。

结束语

kubernetes庖丁解牛系列中,apiserver这块的源码还是比较难啃的,由于apiserver要处理的api比较多,在路由分发上的设计还是比较复杂的,需要耐心研读。文章中必然会有一些不严谨的地方,还希望大家包涵,大家吸取精华(如果有的话),去其糟粕。如果大家感兴趣可以关注我。我的微信号:liyakuail1991

参考文档

核心原理:blog.csdn.net/huwh_/artic…

调用链路分析:github.com/duyanghao/k…

源码分析:blog.csdn.net/cbmljs/arti…

kubernetes源码分析:www.bookstack.cn/read/source…

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享