# kubebuilder详解 ## 控制器模式 kubernetes作为一个“容器编排”平台,其核心的功能是编排,Pod作为K8s调度的最小单位,具备很多属性和字段,k8s编排正是通过一个个控制器根据被控制对象的属性和字段来实现。 > 例如 ``` apiVersion: apps/v1 kind: Deployment metadata: name: test spec: selector: matchLabels: app: test replicas: 2 template: metadata: labels: app: test spec: containers: - name: nginx image: nginx:1.7.9 ports: - containerPort: 80 ``` K8s集群在部署时包含了Controllers组件,里面对于每个build-in的资源类型(比如Deployment、Statefulset、Cronjob...)都有对应的Controller,基本是1:1的关系。上面的例子中,Deployment 资源创建之后,对应的 Deployment Controller 编排动作很简单,确保属于当前deployment资源的 Pod 个数永远等于 2,Pod 由 template 部分定义,具体来说,K8s 里面是 kube-controller-manager 这个组件在做这件事,可以看下 K8s 项目的 pkg/controller 目录,里面包含了所有控制器,都以独有的方式负责某种编排功能,但是它们都遵循一个通用编排模式,即:调谐循环(Reconcile loop),其伪代码逻辑为: ``` for { actualState := GetResourceActualState(rsvc) expectState := GetResourceExpectState(rsvc) if actualState == expectState { // do nothing } else { Reconcile(rsvc) } } ``` 就是一个无限循环(实际是事件驱动 定时同步来实现,不是无脑循环)不断地对比期望状态和实际状态,如果有出入则进行 Reconcile(调谐)逻辑将实际状态调整为期望状态。期望状态就是我们的对象定义(通常是 YAML 文件),实际状态是集群里面当前的运行状态(通常来自于 K8s 集群内外相关资源的状态汇总),控制器的编排逻辑主要是第三步做的,这个操作被称为调谐(Reconcile),整个控制器调谐的过程称为“Reconcile Loop”,调谐的最终结果一般是对被控制对象的某种写操作,比如增/删/改 Pod。 在控制器中定义被控制对象是通过“模板”完成的,比如 Deployment 里面的 template 字段里的内容跟一个标准的 Pod 对象的 API 定义一样,所有被这个 Deployment 管理的 Pod 实例,都是根据这个 template 字段的创建的,这就是 PodTemplate,一个控制对象的定义一般是由上半部分的控制定义(期望状态),加上下半部分的被控制对象的模板组成。 ## 声明式API 声明式API就是“告诉K8s你要做什么,而不是告诉它怎么做的命令”,一个很熟悉的例子就是 SQL,你“告诉 DB 根据条件和各类算子返回数据,而不是告诉它怎么遍历,过滤,聚合”。在 K8s 里面,声明式的体现就是 kubectl apply 命令,在对象创建和后续更新中一直使用相同的 apply 命令,告诉 K8s 对象的终态即可,底层是通过执行了一个对原有 API 对象的 PATCH 操作来实现的,可以一次性处理多个写操作,具备 Merge 能力 diff 出最终的 PATCH,而命令式一次只能处理一个写请求。 声明式 API 让 K8s 的“容器编排”世界看起来温柔美好,而控制器(以及容器运行时,存储,网络模型等)才是这太平盛世的幕后英雄。说到这里,就会有人希望也能像 build-in 资源一样构建自己的自定义资源(CRD-Customize Resource Definition),然后为自定义资源写一个对应的控制器,推出自己的声明式 API。K8s 提供了 CRD 的扩展方式来满足用户这一需求,而且由于这种扩展方式十分灵活,在最新的 1.15 版本对 CRD 做了相当大的增强。对于用户来说,实现 CRD 扩展主要做两件事: - 编写 CRD 并将其部署到 K8s 集群里; 这一步的作用就是让 K8s 知道有这个资源及其结构属性,在用户提交该自定义资源的定义时(通常是 YAML 文件定义),K8s 能够成功校验该资源并创建出对应的 Go struct 进行持久化,同时触发控制器的调谐逻辑。 - 编写 Controller 并将其部署到 K8s 集群里。 这一步的作用就是实现调谐逻辑。 Kubebuilder 就是帮我们简化这两件事的工具,现在我们开始介绍主角。 ## kubebuilder ### 摘要 Kubebuilder 是一个使用 CRDs 构建 K8s API 的 SDK,主要是: - 提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 代码和配置; - 提供代码库封装底层的 K8s go-client; 方便用户从零开始开发 CRDs,Controllers 和 Admission Webhooks 来扩展 K8s。 ### 核心概念 > GVKs & GVRs - GVK = GroupVersionKind - GVR = GroupVersionResource > API Group & Versions (GV) API Group是相关API功能的集合,每个Group拥有一个或多个Versions,用于接口的演进 > Kinds & Resources 每个 GV 都包含多个 API 类型,称为 Kinds,在不同的 Versions 之间同一个 Kind 定义可能不同, Resource 是 Kind 的对象标识(resource type),一般来说 Kinds 和 Resources 是 1:1 的,比如 pods Resource 对应 Pod Kind,但是有时候相同的 Kind 可能对应多个 Resources,比如 Scale Kind 可能对应很多 Resources:deployments/scale,replicasets/scale,对于 CRD 来说,只会是 1:1 的关系。 每一个 GVK 都关联着一个 package 中给定的 root Go type,比如 apps/v1/Deployment 就关联着 K8s 源码里面 k8s.io/api/apps/v1 package 中的 Deployment struct,我们提交的各类资源定义 YAML 文件都需要写: - apiVersion: 这个就是GV - kind:这个就是K 根据 GVK K8s 就能找到你到底要创建什么类型的资源,根据你定义的 Spec 创建好资源之后就成为了 Resource,也就是 GVR。GVK/GVR 就是 K8s 资源的坐标,是我们创建/删除/修改/读取资源的基础。 ### Scheme 每一组 Controllers 都需要一个 Scheme,提供了 Kinds 与对应 Go types 的映射,也就是说给定 Go type 就知道他的 GVK,给定 GVK 就知道他的 Go type,比如说我们给定一个 Scheme: “tutotial.kubebuilder.io/api/v1”.CronJob{} 这个 Go type 映射到 batch.tutotial.kubebuilder.io/v1 的 CronJob GVK,那么从 Api Server 获取到下面的 JSON: ``` { "kind": "CronJob", "apiVersion": "batch.tutorial.kubebuilder.io/v1", ... } ``` 就能构造出对应的 Go type了,通过这个 Go type 也能正确地获取 GVR 的一些信息,控制器可以通过该 Go type 获取到期望状态以及其他辅助信息进行调谐逻辑。 ### 核心架构 ![核心架构图](http://xieys.club/images/posts/20210831163331761393343.png) #### Process process进程通过main.go启动,一般来说一个Controller只有一个进程,如果做了高可用的话,会有多个 #### Manager 每个进程会有一个Manager,这是核心组件,主要负责 - metrics的暴露 - webhook证书 - 初始化共享的cache - 初始化共享的clients用于和APIServer进行通信 - 所有的Controller的运行 #### Client 一般来说,我们创建、更新、删除某个资源的时候会直接调用Client和APIServer进行通信 #### Cache cache负责同步Controller关心的资源,其核心是GVK -> Informer 的映射,一般我们的Get 和 List 操作都会从Cache中获取数据。Informer 会负责监听对应 GVK 的 GVRs 的创建/删除/更新操作,以触发 Controller 的 Reconcile 逻辑。 #### Controller 为控制器的业务逻辑所载的地方,一个Manager可能会有多个Controller,我们一般只需要实现Reconcile方法就行。图上的Predicate是事件过滤器,我们可以再Controller中过滤掉我们不关心的事件信息。 #### WebHook 就是我们准入控制实现的地方了,主要有2类接口: - 一个是MutatingAdmissionWebhook需要实现Defaulter接口 - 一个是ValidatingAdmissionWebhook需要实现Validator接口 ### 其他 #### Index 由于 Controller 经常要对 Cache 进行查询,Kubebuilder 提供 Index utility 给 Cache 加索引提升查询效率。 #### Finalizer 在一般情况下,如果资源被删除之后,我们虽然能够被触发删除事件,但是这个时候从 Cache 里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行,K8s 的 Finalizer 字段用于处理这种情况。在 K8s 中,只要对象 ObjectMeta 里面的 Finalizers 不为空,对该对象的 delete 操作就会转变为 update 操作,具体说就是 update deletionTimestamp 字段,其意义就是告诉 K8s 的 GC“在deletionTimestamp 这个时刻之后,只要 Finalizers 为空,就立马删除掉该对象”。 所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。 #### OwnerReference K8s GC 在删除一个对象时,任何 ownerReference 是该对象的对象都会被清除,与此同时,Kubebuidler 支持所有对象的变更都会触发 Owner 对象 controller 的 Reconcile 方法。 ## kubebuilder的使用 ### 创建脚手架工程 ``` [root@kubebuilder CronJob]# kubebuilder init --domain xieys.io --repo xieys.io ``` 这一步创建了一个Go module工程,引入了必要的依赖,创建了一些模板文件 `go.mod` 基本依赖项 `Makefile` 为构建和部署控制器指定目标 `PROJECT` kubebuilder元数据脚手架新组建 `config/` 启动配置,它只包含在集群上启动控制器所需的Kustomize YAML定义,它还将保存我们的CustomResourceDefinitions、RBAC配置和WebhookConfigurations。 `config/default/` 包含一个Kustomize基础,用于在标准配置中启动控制器。 `config/manager` 在集群内启动你的控制器相关的yaml定义 `config/rbac` 在自己的服务帐户下运行控制器所需的权限相关的yaml定义 ### 创建API ``` [root@kubebuilder CronJob]# kubebuilder create api --group batch --version v1 --kind CronJob ``` 这一步创建了对应的CRD和Controller模板文件,经过这两步,现有的工程结构图如下: ![Kubebuilder生成的工程结构图](http://xieys.club/images/posts/image-20210830160800387.png) ### 定义CRD 在Kubebuilder生成的工程结构图中的cronjob_types.go文件里定义Spec 和 Status。 ### 编写Controller逻辑 在Kubebuilder生成的工程结构图中的cronjob_controller.go 文件中实现Reconcile的逻辑。 ### 测试发布 本地测试完之后使用kubebuilder的Makefile构建镜像,部署我们的CRDs和Controller即可 ## kubebuilder深入 问题: - 如果同步自定义资源以及k8s build-in资源 - Controller的Reconcile方法是如何被触发的? - Cache的工作原理是什么? ### 源码阅读 #### 从main.go开始 kubebuilder创建的main.go是整个项目的入口,逻辑十分简单: ``` var ( scheme = runtime.NewScheme() setupLog = ctrl.Log.WithName("setup") ) func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) utilruntime.Must(batchv1.AddToScheme(scheme)) //+kubebuilder:scaffold:scheme } func main() { ... //1、初始化Manager mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, MetricsBindAddress: metricsAddr, Port: 9443, HealthProbeBindAddress: probeAddr, LeaderElection: enableLeaderElection, LeaderElectionID: "b6d835ad.xieys.io", //CertDir: "config/cert", //手动指定证书位置用于测试 }) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) } //2、初始化Reconciler(Controller) if err = (&controllers.CronJobReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("CronJob"), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CronJob") os.Exit(1) } if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up health check") os.Exit(1) } if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { setupLog.Error(err, "unable to set up ready check") os.Exit(1) } // 启动Manager if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } } ``` 可以看到在init方法里面我们将batchv1注册到Scheme里面去了,这样一来Cache就知道该watch谁了,main方法里面的逻辑基本都是manager的: - 初始化一个Manager - 将Manager的Client传给Controller,并且调用SetupWithManager方法传入Manager进行Controller的初始化 - 启动webhook(上述代码没有添加webhook) - 添加健康检查 - 启动Manager 下面我们就顺着main函数里面的逻辑一步步往下看 #### NewManager ``` // NewManager returns a new Manager for creating Controllers. NewManager = manager.New // New returns a new Manager for creating Controllers. func New(config *rest.Config, options Options) (Manager, error) { // Set default values for options fields options = setOptionsDefaults(options) //初始化集群,创建cache以及client cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { clusterOptions.Scheme = options.Scheme clusterOptions.MapperProvider = options.MapperProvider clusterOptions.Logger = options.Logger clusterOptions.SyncPeriod = options.SyncPeriod clusterOptions.Namespace = options.Namespace clusterOptions.NewCache = options.NewCache clusterOptions.ClientBuilder = options.ClientBuilder clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor clusterOptions.DryRunClient = options.DryRunClient clusterOptions.EventBroadcaster = options.EventBroadcaster }) if err != nil { return nil, err } //创建事件记录器 recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } // 需要高可用的话,创建选举相关的配置 leaderConfig := options.LeaderElectionConfig if leaderConfig == nil { leaderConfig = rest.CopyConfig(config) } resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{ LeaderElection: options.LeaderElection, LeaderElectionResourceLock: options.LeaderElectionResourceLock, LeaderElectionID: options.LeaderElectionID, LeaderElectionNamespace: options.LeaderElectionNamespace, }) if err != nil { return nil, err } //创建metric和健康检查的接口 metricsListener, err := options.newMetricsListener(options.MetricsBindAddress) if err != nil { return nil, err } // By default we have no extra endpoints to expose on metrics http server. metricsExtraHandlers := make(map[string]http.Handler) // Create health probes listener. This will throw an error if the bind // address is invalid or already in use. healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress) if err != nil { return nil, err } //最后将这些配置放到manager中 return &controllerManager{ cluster: cluster, recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, logger: options.Logger, elected: make(chan struct{}), port: options.Port, host: options.Host, certDir: options.CertDir, leaseDuration: *options.LeaseDuration, renewDeadline: *options.RenewDeadline, retryPeriod: *options.RetryPeriod, healthProbeListener: healthProbeListener, readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, internalProceduresStop: make(chan struct{}), leaderElectionStopped: make(chan struct{}), }, nil } ``` > cluster.New(config, func(clusterOptions *cluster.Options) ``` // New constructs a brand new cluster func New(config *rest.Config, opts ...Option) (Cluster, error) { ... options := Options{} // 为options赋值,上面以函数传入进来的 for _, opt := range opts { opt(&options) } // 为options设置默认的值 options = setOptionsDefaults(options) // 创建mapper映射器, k8s restAPI与go type的转换器 mapper, err := options.MapperProvider(config) if err != nil { options.Logger.Error(err, "Failed to get API Group-Resources") return nil, err } // 创建cache cache, err := options.NewCache(config, cache.Options{ Scheme: options.Scheme, //main中传入的scheme Mapper: mapper, //k8s api和gotype的转换器 Resync: options.SyncPeriod, //默认10小时,一般不要改 Namespace: options.Namespace,//需要监听的namespace }) if err != nil { return nil, err } //创建client的options选项 clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper} //用于查询kubernetes资源,但是会跳过cache,直接从apiserver中查询 apiReader, err := client.New(config, clientOptions) if err != nil { return nil, err } //初始化用于写操作的client writeObj, err := options.ClientBuilder. WithUncached(options.ClientDisableCacheFor...). Build(cache, config, clientOptions) if err != nil { return nil, err } if options.DryRunClient { writeObj = client.NewDryRunClient(writeObj) } //创建事件记录器 recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } return &cluster{ config: config, scheme: options.Scheme, cache: cache, fieldIndexes: cache, client: writeObj, apiReader: apiReader, recorderProvider: recorderProvider, mapper: mapper, logger: options.Logger, }, nil } ``` #### 创建Cache ``` 可以看到在为options设置默认的值的时候,有把New函数赋值给了options.NewCache func setOptionsDefaults(options Options) Options { ... if options.NewCache == nil { options.NewCache = cache.New } ... return options } // New initializes and returns a new Cache. func New(config *rest.Config, opts Options) (Cache, error) { opts, err := defaultOpts(config, opts) if err != nil { return nil, err } im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) return &informerCache{InformersMap: im}, nil } ``` 这里主要是调用NewInformersMap方法创建Informer的映射 ``` // NewInformersMap creates a new InformersMap that can create informers for // both structured and unstructured objects. func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *InformersMap { return &InformersMap{ structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace), unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace), Scheme: scheme, } } ``` NewInformersMap会分别创建,结构化、非结构化以及metadata的informerMap,而这些方法最后都会调用newSpecificlnformersMap方法,区别就是不同的方法传入的createListWatcherFunc参数不同 ``` // newStructuredInformersMap creates a new InformersMap for structured objects. func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch) } // newUnstructuredInformersMap creates a new InformersMap for unstructured objects. func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch) } // newMetadataInformersMap creates a new InformersMap for metadata-only objects. func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch) } func newSpecificInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string, createListWatcher createListWatcherFunc) *specificInformersMap { ip := &specificInformersMap{ config: config, Scheme: scheme, mapper: mapper, informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), codecs: serializer.NewCodecFactory(scheme), paramCodec: runtime.NewParameterCodec(scheme), resync: resync, startWait: make(chan struct{}), createListWatcher: createListWatcher, namespace: namespace, } return ip } ``` newSpecificInformersMap和常规的InformersMap类似,区别是没实现WaitForCacheSync方法。 以结构化的传入的createStructuredListWatch为例,主要是返回一个用于创建SharedIndexInformer 的 ListWatch 对象 ``` func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) { // Kubernetes APIs work against Resources, not GroupVersionKinds. Map the // groupVersionKind to the Resource API we will use. mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, err } client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs) if err != nil { return nil, err } listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List") listObj, err := ip.Scheme.New(listGVK) if err != nil { return nil, err } // TODO: the functions that make use of this ListWatch should be adapted to // pass in their own contexts instead of relying on this fixed one here. ctx := context.TODO() // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { res := listObj.DeepCopyObject() isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) return res, err }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { // Watch needs to be set to true separately opts.Watch = true isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx) }, }, nil } ``` cache主要是创建了一些InformerMap,完成了GVK到Informer的映射,每个Informer会根据ListWatch函数对对应的GVK进行List和Watch。 #### 创建Client > apiReader ``` apiReader, err := client.New(config, clientOptions) //New使用提供的配置和选项返回一个新的客户端。返回的客户端直接从服务器读取*和*写入(它不使用对象缓存)。它理解如何使用普通类型(自定义资源和聚合/内置资源),以及非结构化类型。对于普通类型,该模式将用于查找给定类型的对应组、版本和类型。在非结构化类型的情况下,将从对象的相应字段中提取组、版本和类型。 func New(config *rest.Config, options Options) (Client, error) { if config == nil { return nil, fmt.Errorf("must provide non-nil rest.Config to client.New") } // Init a scheme if none provided if options.Scheme == nil { options.Scheme = scheme.Scheme } // Init a Mapper if none provided if options.Mapper == nil { var err error options.Mapper, err = apiutil.NewDynamicRESTMapper(config) if err != nil { return nil, err } } clientcache := &clientCache{ config: config, scheme: options.Scheme, mapper: options.Mapper, codecs: serializer.NewCodecFactory(options.Scheme), structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta), } rawMetaClient, err := metadata.NewForConfig(config) if err != nil { return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err) } c := &client{ typedClient: typedClient{ cache: clientcache, paramCodec: runtime.NewParameterCodec(options.Scheme), }, unstructuredClient: unstructuredClient{ cache: clientcache, paramCodec: noConversionParamCodec{}, }, metadataClient: metadataClient{ client: rawMetaClient, restMapper: options.Mapper, }, scheme: options.Scheme, mapper: options.Mapper, } return c, nil } ``` > writeObj ``` writeObj, err := options.ClientBuilder. WithUncached(options.ClientDisableCacheFor...). Build(cache, config, clientOptions) options设置默认值的时候,有个ClientBuilder接口初始化 // Allow the client builder to be mocked if options.ClientBuilder == nil { options.ClientBuilder = NewClientBuilder() } // NewClientBuilder returns a builder to build new clients to be passed when creating a Manager. func NewClientBuilder() ClientBuilder { return &newClientBuilder{} } type newClientBuilder struct { uncached []client.Object } func (n *newClientBuilder) WithUncached(objs ...client.Object) ClientBuilder { n.uncached = append(n.uncached, objs...) return n } //重点看Build方法,这里实现了client的读写分离 func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { // Create the Client for Write operations. c, err := client.New(config, options) if err != nil { return nil, err } return client.NewDelegatingClient(client.NewDelegatingClientInput{ CacheReader: cache, Client: c, UncachedObjects: n.uncached, }) } func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) { uncachedGVKs := map[schema.GroupVersionKind]struct{}{} for _, obj := range in.UncachedObjects { gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme()) if err != nil { return nil, err } uncachedGVKs[gvk] = struct{}{} } return &delegatingClient{ scheme: in.Client.Scheme(), mapper: in.Client.RESTMapper(), Reader: &delegatingReader{ CacheReader: in.CacheReader, ClientReader: in.Client, scheme: in.Client.Scheme(), uncachedGVKs: uncachedGVKs, cacheUnstructured: in.CacheUnstructured, }, Writer: in.Client, StatusClient: in.Client, }, nil } ``` 上面可以看到writeObj最后返回的是Client接口,在Build方法里调用了NewDelegatingClient函数,通过返回的delegatingClient实现的读写分离。读的时候使用的是cache,写的时候使用的是client #### Controller ``` if err = (&controllers.CronJobReconciler{ Client: mgr.GetClient(), Log: ctrl.Log.WithName("controllers").WithName("CronJob"), Scheme: mgr.GetScheme(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "CronJob") os.Exit(1) } ``` 主要初始化了controllers的结构体,然后调用了SetupWithManager方法 ``` // SetupWithManager sets up the controller with the Manager. func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&batchv1.CronJob{}). //Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}). Complete(r) } ``` SetupWithManager之前有讲到过,主要是使用了建造者模式,去构建了我们需要监听的对象,只有这些对象的相关事件才会触发我们的 Reconcile 逻辑。这里面的 Complete 最后其实是调用了 Build 方法。**其中NewControllerManagedBy是一个建造者模式,返回的是一个builder对象,其包含了用于构建的`For`、`Owns`、`Watches`、`WithEventFilter`等方法。** ``` // Complete builds the Application ControllerManagedBy. func (blder *Builder) Complete(r reconcile.Reconciler) error { _, err := blder.Build(r) return err } // Build builds the Application ControllerManagedBy and returns the Controller it created. func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) { ... // Set the Config blder.loadRestConfig() // Set the ControllerManagedBy if err := blder.doController(r); err != nil { return nil, err } // Set the Watch if err := blder.doWatch(); err != nil { return nil, err } return blder.ctrl, nil } ``` Build方法主要调用doController、doWatch两个方法 >doController ``` func (blder *Builder) doController(r reconcile.Reconciler) error { ctrlOptions := blder.ctrlOptions if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r } // Retrieve the GVK from the object we're reconciling // to prepopulate logger information, and to optionally generate a default name. gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme()) if err != nil { return err } // Setup the logger. if ctrlOptions.Log == nil { ctrlOptions.Log = blder.mgr.GetLogger() } ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind) // Build the controller and return. blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions) return err } ``` doController主要是初始化了一个Controller,这里面传入了我们实现的Reconciler以及获取到我们的GVK的名称 >doWatch ``` func (blder *Builder) doWatch() error { // Reconcile type typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForObject{} allPredicates := append(blder.globalPredicates, blder.forInput.predicates...) //blder.ctr.Watch,会往src添加cache接口,调用的是mgr接口(controllerManager结构体)的SetFields,这里面又调用了Cluster接口(cluster结构体)的SetFields方法 if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } // Watches the managed types for _, own := range blder.ownsInput { typeForSrc, err := blder.project(own.object, own.objectProjection) if err != nil { return err } src := &source.Kind{Type: typeForSrc} hdler := &handler.EnqueueRequestForOwner{ OwnerType: blder.forInput.object, IsController: true, } allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, own.predicates...) if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil { return err } } // Do the watch requests for _, w := range blder.watchesInput { allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...) allPredicates = append(allPredicates, w.predicates...) // If the source of this watch is of type *source.Kind, project it. if srckind, ok := w.src.(*source.Kind); ok { typeForSrc, err := blder.project(srckind.Type, w.objectProjection) if err != nil { return err } srckind.Type = typeForSrc } if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil { return err } } return nil } ``` Watch 主要是监听我们想要的资源变化,blder.ctrl.Watch(src, hdler, allPredicates...)通过过滤源事件的变化,allPredicates是过滤器,只有所有的过滤器都返回 true 时,才会将事件传递给 EventHandler hdler,这里会将 Handler 注册到 Informer 上 #### 启动 ``` func (cm *controllerManager) Start(ctx context.Context) (err error) { if err := cm.Add(cm.cluster); err != nil { return fmt.Errorf("failed to add cluster to runnables: %w", err) } cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // 用来退出所有协程 stopComplete := make(chan struct{}) defer close(stopComplete) ... //用来保存错误 cm.errChan = make(chan error) // 如果需要metric,就启动metric服务 if cm.metricsListener != nil { go cm.serveMetrics() } // 启动健康检查服务 if cm.healthProbeListener != nil { go cm.serveHealthProbes() } //选举相关 go cm.startNonLeaderElectionRunnables() go func() { if cm.resourceLock != nil { err := cm.startLeaderElection() if err != nil { cm.errChan <- err } } else { // Treat not having leader election enabled the same as being elected. cm.startLeaderElectionRunnables() close(cm.elected) } }() //判断是否需要退出 select { case <-ctx.Done(): // We are done return nil case err := <-cm.errChan: // Error starting or running a runnable return err } } ``` 无论是不是 leader 最后都会使用 startRunnable 启动 Controller,实际上都是调用了Controller的Start方法 ``` // Start implements controller.Controller func (c *Controller) Start(ctx context.Context) error { // Controller只能被执行一次 c.mu.Lock() if c.Started { return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times") } c.initMetrics() // Set the internal context. c.ctx = ctx // 获取队列 c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed err := func() error { defer c.mu.Unlock() defer utilruntime.HandleCrash() // 尝试等待缓存 for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } // Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches c.Log.Info("Starting Controller") for _, watch := range c.startWatches { syncingSource, ok := watch.src.(source.SyncingSource) if !ok { continue } if err := func() error { // use a context with timeout for launching sources and syncing caches. sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) defer cancel() // WaitForSync waits for a definitive timeout, and returns if there // is an error or a timeout if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) c.Log.Error(err, "Could not wait for Cache to sync") return err } return nil }(); err != nil { return err } } // All the watches have been started, we can reset the local slice. // // We should never hold watches more than necessary, each watch source can hold a backing cache, // which won't be garbage collected if we hold a reference to it. c.startWatches = nil if c.JitterPeriod == 0 { c.JitterPeriod = 1 * time.Second } // Launch workers to process resources c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles) for i := 0; i < c.MaxConcurrentReconciles; i++ { go wait.UntilWithContext(ctx, func(ctx context.Context) { // 查询队列中有没有关注的事件,有的话就触发我们的 reconcile 逻辑 for c.processNextWorkItem(ctx) { } }, c.JitterPeriod) } c.Started = true return nil }() if err != nil { return err } <-ctx.Done() c.Log.Info("Stopping workers") return nil } ``` ### 总结 Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。 Kubebuilder 是一个非常好用的 Operator 开发框架,不仅极大的简化了 Operator 的开发过程,并且充分的利用了 go interface 的特性留下了足够的扩展性。