目录

kubebuilder详解

控制器模式

kubernetes作为一个“容器编排”平台,其核心的功能是编排,Pod作为K8s调度的最小单位,具备很多属性和字段,k8s编排正是通过一个个控制器根据被控制对象的属性和字段来实现。

例如

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
apiVersion: apps/v1
kind: Deployment
metadata:
  name: test
spec:
  selector:
    matchLabels:
      app: test
  replicas: 2
  template:
    metadata:
      labels:
        app: test
    spec:
      containers:
      - name: nginx
        image: nginx:1.7.9
        ports:
        - containerPort: 80

K8s集群在部署时包含了Controllers组件,里面对于每个build-in的资源类型(比如Deployment、Statefulset、Cronjob…)都有对应的Controller,基本是1:1的关系。上面的例子中,Deployment 资源创建之后,对应的 Deployment Controller 编排动作很简单,确保属于当前deployment资源的 Pod 个数永远等于 2,Pod 由 template 部分定义,具体来说,K8s 里面是 kube-controller-manager 这个组件在做这件事,可以看下 K8s 项目的 pkg/controller 目录,里面包含了所有控制器,都以独有的方式负责某种编排功能,但是它们都遵循一个通用编排模式,即:调谐循环(Reconcile loop),其伪代码逻辑为:

1
2
3
4
5
6
7
8
9
for {
    actualState := GetResourceActualState(rsvc)
    expectState := GetResourceExpectState(rsvc)
    if actualState == expectState {
    // do nothing
    } else {
    Reconcile(rsvc)
    }
}

就是一个无限循环(实际是事件驱动 定时同步来实现,不是无脑循环)不断地对比期望状态和实际状态,如果有出入则进行 Reconcile(调谐)逻辑将实际状态调整为期望状态。期望状态就是我们的对象定义(通常是 YAML 文件),实际状态是集群里面当前的运行状态(通常来自于 K8s 集群内外相关资源的状态汇总),控制器的编排逻辑主要是第三步做的,这个操作被称为调谐(Reconcile),整个控制器调谐的过程称为“Reconcile Loop”,调谐的最终结果一般是对被控制对象的某种写操作,比如增/删/改 Pod。

在控制器中定义被控制对象是通过“模板”完成的,比如 Deployment 里面的 template 字段里的内容跟一个标准的 Pod 对象的 API 定义一样,所有被这个 Deployment 管理的 Pod 实例,都是根据这个 template 字段的创建的,这就是 PodTemplate,一个控制对象的定义一般是由上半部分的控制定义(期望状态),加上下半部分的被控制对象的模板组成。

声明式API

声明式API就是“告诉K8s你要做什么,而不是告诉它怎么做的命令”,一个很熟悉的例子就是 SQL,你“告诉 DB 根据条件和各类算子返回数据,而不是告诉它怎么遍历,过滤,聚合”。在 K8s 里面,声明式的体现就是 kubectl apply 命令,在对象创建和后续更新中一直使用相同的 apply 命令,告诉 K8s 对象的终态即可,底层是通过执行了一个对原有 API 对象的 PATCH 操作来实现的,可以一次性处理多个写操作,具备 Merge 能力 diff 出最终的 PATCH,而命令式一次只能处理一个写请求。

声明式 API 让 K8s 的“容器编排”世界看起来温柔美好,而控制器(以及容器运行时,存储,网络模型等)才是这太平盛世的幕后英雄。说到这里,就会有人希望也能像 build-in 资源一样构建自己的自定义资源(CRD-Customize Resource Definition),然后为自定义资源写一个对应的控制器,推出自己的声明式 API。K8s 提供了 CRD 的扩展方式来满足用户这一需求,而且由于这种扩展方式十分灵活,在最新的 1.15 版本对 CRD 做了相当大的增强。对于用户来说,实现 CRD 扩展主要做两件事:

  • 编写 CRD 并将其部署到 K8s 集群里; 这一步的作用就是让 K8s 知道有这个资源及其结构属性,在用户提交该自定义资源的定义时(通常是 YAML 文件定义),K8s 能够成功校验该资源并创建出对应的 Go struct 进行持久化,同时触发控制器的调谐逻辑。

  • 编写 Controller 并将其部署到 K8s 集群里。 这一步的作用就是实现调谐逻辑。

Kubebuilder 就是帮我们简化这两件事的工具,现在我们开始介绍主角。

kubebuilder

摘要

Kubebuilder 是一个使用 CRDs 构建 K8s API 的 SDK,主要是:

  • 提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 代码和配置;
  • 提供代码库封装底层的 K8s go-client;

方便用户从零开始开发 CRDs,Controllers 和 Admission Webhooks 来扩展 K8s。

核心概念

GVKs & GVRs

  • GVK = GroupVersionKind
  • GVR = GroupVersionResource

API Group & Versions (GV)

API Group是相关API功能的集合,每个Group拥有一个或多个Versions,用于接口的演进

Kinds & Resources

每个 GV 都包含多个 API 类型,称为 Kinds,在不同的 Versions 之间同一个 Kind 定义可能不同, Resource 是 Kind 的对象标识(resource type),一般来说 Kinds 和 Resources 是 1:1 的,比如 pods Resource 对应 Pod Kind,但是有时候相同的 Kind 可能对应多个 Resources,比如 Scale Kind 可能对应很多 Resources:deployments/scale,replicasets/scale,对于 CRD 来说,只会是 1:1 的关系。

每一个 GVK 都关联着一个 package 中给定的 root Go type,比如 apps/v1/Deployment 就关联着 K8s 源码里面 k8s.io/api/apps/v1 package 中的 Deployment struct,我们提交的各类资源定义 YAML 文件都需要写:

  • apiVersion: 这个就是GV
  • kind:这个就是K

根据 GVK K8s 就能找到你到底要创建什么类型的资源,根据你定义的 Spec 创建好资源之后就成为了 Resource,也就是 GVR。GVK/GVR 就是 K8s 资源的坐标,是我们创建/删除/修改/读取资源的基础。

Scheme

每一组 Controllers 都需要一个 Scheme,提供了 Kinds 与对应 Go types 的映射,也就是说给定 Go type 就知道他的 GVK,给定 GVK 就知道他的 Go type,比如说我们给定一个 Scheme: “tutotial.kubebuilder.io/api/v1”.CronJob{} 这个 Go type 映射到 batch.tutotial.kubebuilder.io/v1 的 CronJob GVK,那么从 Api Server 获取到下面的 JSON:

1
2
3
4
5
{
    "kind": "CronJob",
    "apiVersion": "batch.tutorial.kubebuilder.io/v1",
    ...
}

就能构造出对应的 Go type了,通过这个 Go type 也能正确地获取 GVR 的一些信息,控制器可以通过该 Go type 获取到期望状态以及其他辅助信息进行调谐逻辑。

核心架构

http://xieys.club/images/posts/20210831163331761393343.png

Process

process进程通过main.go启动,一般来说一个Controller只有一个进程,如果做了高可用的话,会有多个

Manager

每个进程会有一个Manager,这是核心组件,主要负责

  • metrics的暴露
  • webhook证书
  • 初始化共享的cache
  • 初始化共享的clients用于和APIServer进行通信
  • 所有的Controller的运行

Client

一般来说,我们创建、更新、删除某个资源的时候会直接调用Client和APIServer进行通信

Cache

cache负责同步Controller关心的资源,其核心是GVK -> Informer 的映射,一般我们的Get 和 List 操作都会从Cache中获取数据。Informer 会负责监听对应 GVK 的 GVRs 的创建/删除/更新操作,以触发 Controller 的 Reconcile 逻辑。

Controller

为控制器的业务逻辑所载的地方,一个Manager可能会有多个Controller,我们一般只需要实现Reconcile方法就行。图上的Predicate是事件过滤器,我们可以再Controller中过滤掉我们不关心的事件信息。

WebHook

就是我们准入控制实现的地方了,主要有2类接口:

  • 一个是MutatingAdmissionWebhook需要实现Defaulter接口
  • 一个是ValidatingAdmissionWebhook需要实现Validator接口

其他

Index

由于 Controller 经常要对 Cache 进行查询,Kubebuilder 提供 Index utility 给 Cache 加索引提升查询效率。

Finalizer

在一般情况下,如果资源被删除之后,我们虽然能够被触发删除事件,但是这个时候从 Cache 里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行,K8s 的 Finalizer 字段用于处理这种情况。在 K8s 中,只要对象 ObjectMeta 里面的 Finalizers 不为空,对该对象的 delete 操作就会转变为 update 操作,具体说就是 update deletionTimestamp 字段,其意义就是告诉 K8s 的 GC“在deletionTimestamp 这个时刻之后,只要 Finalizers 为空,就立马删除掉该对象”。

所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。

OwnerReference

K8s GC 在删除一个对象时,任何 ownerReference 是该对象的对象都会被清除,与此同时,Kubebuidler 支持所有对象的变更都会触发 Owner 对象 controller 的 Reconcile 方法。

kubebuilder的使用

创建脚手架工程

1
[root@kubebuilder CronJob]# kubebuilder init --domain xieys.io --repo xieys.io

这一步创建了一个Go module工程,引入了必要的依赖,创建了一些模板文件

go.mod 基本依赖项

Makefile 为构建和部署控制器指定目标

PROJECT kubebuilder元数据脚手架新组建

config/ 启动配置,它只包含在集群上启动控制器所需的Kustomize YAML定义,它还将保存我们的CustomResourceDefinitions、RBAC配置和WebhookConfigurations。

config/default/ 包含一个Kustomize基础,用于在标准配置中启动控制器。

config/manager 在集群内启动你的控制器相关的yaml定义

config/rbac 在自己的服务帐户下运行控制器所需的权限相关的yaml定义

创建API

1
[root@kubebuilder CronJob]# kubebuilder create api --group batch --version v1 --kind CronJob

这一步创建了对应的CRD和Controller模板文件,经过这两步,现有的工程结构图如下:

http://xieys.club/images/posts/image-20210830160800387.png

定义CRD

在Kubebuilder生成的工程结构图中的cronjob_types.go文件里定义Spec 和 Status。

编写Controller逻辑

在Kubebuilder生成的工程结构图中的cronjob_controller.go 文件中实现Reconcile的逻辑。

测试发布

本地测试完之后使用kubebuilder的Makefile构建镜像,部署我们的CRDs和Controller即可

kubebuilder深入

问题:

  • 如果同步自定义资源以及k8s build-in资源
  • Controller的Reconcile方法是如何被触发的?
  • Cache的工作原理是什么?

源码阅读

从main.go开始

kubebuilder创建的main.go是整个项目的入口,逻辑十分简单:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
var (
	scheme   = runtime.NewScheme()
	setupLog = ctrl.Log.WithName("setup")
)

func init() {
	utilruntime.Must(clientgoscheme.AddToScheme(scheme))

	utilruntime.Must(batchv1.AddToScheme(scheme))
	//+kubebuilder:scaffold:scheme
}
func main() {
	...
	//1、初始化Manager
	mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
		Scheme:                 scheme,
		MetricsBindAddress:     metricsAddr,
		Port:                   9443,
		HealthProbeBindAddress: probeAddr,
		LeaderElection:         enableLeaderElection,
		LeaderElectionID:       "b6d835ad.xieys.io",
		//CertDir:              "config/cert", //手动指定证书位置用于测试
	})
	if err != nil {
		setupLog.Error(err, "unable to start manager")
		os.Exit(1)
	}
	//2、初始化Reconciler(Controller)
	if err = (&controllers.CronJobReconciler{
		Client: mgr.GetClient(),
		Log: ctrl.Log.WithName("controllers").WithName("CronJob"),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "CronJob")
		os.Exit(1)
	}
	
	if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up health check")
		os.Exit(1)
	}
	if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
		setupLog.Error(err, "unable to set up ready check")
		os.Exit(1)
	}
	// 启动Manager
	if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
		setupLog.Error(err, "problem running manager")
		os.Exit(1)
	}
}

可以看到在init方法里面我们将batchv1注册到Scheme里面去了,这样一来Cache就知道该watch谁了,main方法里面的逻辑基本都是manager的:

  • 初始化一个Manager
  • 将Manager的Client传给Controller,并且调用SetupWithManager方法传入Manager进行Controller的初始化
  • 启动webhook(上述代码没有添加webhook)
  • 添加健康检查
  • 启动Manager

下面我们就顺着main函数里面的逻辑一步步往下看

NewManager

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
	// NewManager returns a new Manager for creating Controllers.
	NewManager = manager.New
	
	// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
	// Set default values for options fields
	options = setOptionsDefaults(options)

	//初始化集群,创建cache以及client
	cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
		clusterOptions.Scheme = options.Scheme
		clusterOptions.MapperProvider = options.MapperProvider
		clusterOptions.Logger = options.Logger
		clusterOptions.SyncPeriod = options.SyncPeriod
		clusterOptions.Namespace = options.Namespace
		clusterOptions.NewCache = options.NewCache
		clusterOptions.ClientBuilder = options.ClientBuilder
		clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
		clusterOptions.DryRunClient = options.DryRunClient
		clusterOptions.EventBroadcaster = options.EventBroadcaster
	})
	if err != nil {
		return nil, err
	}

	//创建事件记录器
	recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
	if err != nil {
		return nil, err
	}

	// 需要高可用的话,创建选举相关的配置
	leaderConfig := options.LeaderElectionConfig
	if leaderConfig == nil {
		leaderConfig = rest.CopyConfig(config)
	}
	resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
		LeaderElection:             options.LeaderElection,
		LeaderElectionResourceLock: options.LeaderElectionResourceLock,
		LeaderElectionID:           options.LeaderElectionID,
		LeaderElectionNamespace:    options.LeaderElectionNamespace,
	})
	if err != nil {
		return nil, err
	}

	//创建metric和健康检查的接口
	metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
	if err != nil {
		return nil, err
	}

	// By default we have no extra endpoints to expose on metrics http server.
	metricsExtraHandlers := make(map[string]http.Handler)

	// Create health probes listener. This will throw an error if the bind
	// address is invalid or already in use.
	healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
	if err != nil {
		return nil, err
	}

	//最后将这些配置放到manager中
	return &controllerManager{
		cluster:                 cluster,
		recorderProvider:        recorderProvider,
		resourceLock:            resourceLock,
		metricsListener:         metricsListener,
		metricsExtraHandlers:    metricsExtraHandlers,
		logger:                  options.Logger,
		elected:                 make(chan struct{}),
		port:                    options.Port,
		host:                    options.Host,
		certDir:                 options.CertDir,
		leaseDuration:           *options.LeaseDuration,
		renewDeadline:           *options.RenewDeadline,
		retryPeriod:             *options.RetryPeriod,
		healthProbeListener:     healthProbeListener,
		readinessEndpointName:   options.ReadinessEndpointName,
		livenessEndpointName:    options.LivenessEndpointName,
		gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
		internalProceduresStop:  make(chan struct{}),
		leaderElectionStopped:   make(chan struct{}),
	}, nil
}

cluster.New(config, func(clusterOptions *cluster.Options)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// New constructs a brand new cluster
func New(config *rest.Config, opts ...Option) (Cluster, error) {
	...
	
	options := Options{}
	// 为options赋值,上面以函数传入进来的
	for _, opt := range opts {
		opt(&options)
	}
	// 为options设置默认的值
	options = setOptionsDefaults(options)

	// 创建mapper映射器, k8s restAPI与go type的转换器
	mapper, err := options.MapperProvider(config)
	if err != nil {
		options.Logger.Error(err, "Failed to get API Group-Resources")
		return nil, err
	}

	// 创建cache
	cache, err := options.NewCache(config, 
					cache.Options{
						Scheme: options.Scheme,  //main中传入的scheme
						Mapper: mapper, 		//k8s api和gotype的转换器
						Resync: options.SyncPeriod, //默认10小时,一般不要改
						Namespace: options.Namespace,//需要监听的namespace
						})
	if err != nil {
		return nil, err
	}
	//创建client的options选项
	clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
        //用于查询kubernetes资源,但是会跳过cache,直接从apiserver中查询
	apiReader, err := client.New(config, clientOptions)
	if err != nil {
		return nil, err
	}
        //初始化用于写操作的client
	writeObj, err := options.ClientBuilder.
		WithUncached(options.ClientDisableCacheFor...).
		Build(cache, config, clientOptions)
	if err != nil {
		return nil, err
	}

	if options.DryRunClient {
		writeObj = client.NewDryRunClient(writeObj)
	}

	//创建事件记录器
	recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
	if err != nil {
		return nil, err
	}

	return &cluster{
		config:           config,
		scheme:           options.Scheme,
		cache:            cache,
		fieldIndexes:     cache,
		client:           writeObj,
		apiReader:        apiReader,
		recorderProvider: recorderProvider,
		mapper:           mapper,
		logger:           options.Logger,
	}, nil
}

创建Cache

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
可以看到在为options设置默认的值的时候,有把New函数赋值给了options.NewCache
func setOptionsDefaults(options Options) Options {
	...
	if options.NewCache == nil {
		options.NewCache = cache.New
	}
	...
	return options
}


// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
	opts, err := defaultOpts(config, opts)
	if err != nil {
		return nil, err
	}
	im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
	return &informerCache{InformersMap: im}, nil
}

这里主要是调用NewInformersMap方法创建Informer的映射

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string) *InformersMap {

	return &InformersMap{
		structured:   newStructuredInformersMap(config, scheme, mapper, resync, namespace),
		unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
		metadata:     newMetadataInformersMap(config, scheme, mapper, resync, namespace),

		Scheme: scheme,
	}
}

NewInformersMap会分别创建,结构化、非结构化以及metadata的informerMap,而这些方法最后都会调用newSpecificlnformersMap方法,区别就是不同的方法传入的createListWatcherFunc参数不同

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
	return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
	return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
	return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
}

func newSpecificInformersMap(config *rest.Config,
	scheme *runtime.Scheme,
	mapper meta.RESTMapper,
	resync time.Duration,
	namespace string,
	createListWatcher createListWatcherFunc) *specificInformersMap {
	ip := &specificInformersMap{
		config:            config,
		Scheme:            scheme,
		mapper:            mapper,
		informersByGVK:    make(map[schema.GroupVersionKind]*MapEntry),
		codecs:            serializer.NewCodecFactory(scheme),
		paramCodec:        runtime.NewParameterCodec(scheme),
		resync:            resync,
		startWait:         make(chan struct{}),
		createListWatcher: createListWatcher,
		namespace:         namespace,
	}
	return ip
}

newSpecificInformersMap和常规的InformersMap类似,区别是没实现WaitForCacheSync方法。

以结构化的传入的createStructuredListWatch为例,主要是返回一个用于创建SharedIndexInformer 的 ListWatch 对象

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
	// Kubernetes APIs work against Resources, not GroupVersionKinds.  Map the
	// groupVersionKind to the Resource API we will use.
	mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
	if err != nil {
		return nil, err
	}

	client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
	if err != nil {
		return nil, err
	}
	listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
	listObj, err := ip.Scheme.New(listGVK)
	if err != nil {
		return nil, err
	}

	// TODO: the functions that make use of this ListWatch should be adapted to
	//  pass in their own contexts instead of relying on this fixed one here.
	ctx := context.TODO()
	// Create a new ListWatch for the obj
	return &cache.ListWatch{
		ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
			res := listObj.DeepCopyObject()
			isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
			return res, err
		},
		// Setup the watch function
		WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
			// Watch needs to be set to true separately
			opts.Watch = true
			isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
			return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
		},
	}, nil
}

cache主要是创建了一些InformerMap,完成了GVK到Informer的映射,每个Informer会根据ListWatch函数对对应的GVK进行List和Watch。

创建Client

apiReader

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
apiReader, err := client.New(config, clientOptions)

//New使用提供的配置和选项返回一个新的客户端。返回的客户端直接从服务器读取*和*写入(它不使用对象缓存)。它理解如何使用普通类型(自定义资源和聚合/内置资源),以及非结构化类型。对于普通类型,该模式将用于查找给定类型的对应组、版本和类型。在非结构化类型的情况下,将从对象的相应字段中提取组、版本和类型。
func New(config *rest.Config, options Options) (Client, error) {
	if config == nil {
		return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
	}

	// Init a scheme if none provided
	if options.Scheme == nil {
		options.Scheme = scheme.Scheme
	}

	// Init a Mapper if none provided
	if options.Mapper == nil {
		var err error
		options.Mapper, err = apiutil.NewDynamicRESTMapper(config)
		if err != nil {
			return nil, err
		}
	}

	clientcache := &clientCache{
		config: config,
		scheme: options.Scheme,
		mapper: options.Mapper,
		codecs: serializer.NewCodecFactory(options.Scheme),

		structuredResourceByType:   make(map[schema.GroupVersionKind]*resourceMeta),
		unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
	}

	rawMetaClient, err := metadata.NewForConfig(config)
	if err != nil {
		return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err)
	}

	c := &client{
		typedClient: typedClient{
			cache:      clientcache,
			paramCodec: runtime.NewParameterCodec(options.Scheme),
		},
		unstructuredClient: unstructuredClient{
			cache:      clientcache,
			paramCodec: noConversionParamCodec{},
		},
		metadataClient: metadataClient{
			client:     rawMetaClient,
			restMapper: options.Mapper,
		},
		scheme: options.Scheme,
		mapper: options.Mapper,
	}

	return c, nil
}

writeObj

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
writeObj, err := options.ClientBuilder.
		WithUncached(options.ClientDisableCacheFor...).
		Build(cache, config, clientOptions)
		
		
options设置默认值的时候,有个ClientBuilder接口初始化
	// Allow the client builder to be mocked
	if options.ClientBuilder == nil {
		options.ClientBuilder = NewClientBuilder()
	}
	
// NewClientBuilder returns a builder to build new clients to be passed when creating a Manager.
func NewClientBuilder() ClientBuilder {
	return &newClientBuilder{}
}
type newClientBuilder struct {
	uncached []client.Object
}

func (n *newClientBuilder) WithUncached(objs ...client.Object) ClientBuilder {
	n.uncached = append(n.uncached, objs...)
	return n
}

//重点看Build方法,这里实现了client的读写分离
func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
	// Create the Client for Write operations.
	c, err := client.New(config, options)
	if err != nil {
		return nil, err
	}

	return client.NewDelegatingClient(client.NewDelegatingClientInput{
		CacheReader:     cache,
		Client:          c,
		UncachedObjects: n.uncached,
	})
}


func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
	uncachedGVKs := map[schema.GroupVersionKind]struct{}{}
	for _, obj := range in.UncachedObjects {
		gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme())
		if err != nil {
			return nil, err
		}
		uncachedGVKs[gvk] = struct{}{}
	}

	return &delegatingClient{
		scheme: in.Client.Scheme(),
		mapper: in.Client.RESTMapper(),
		Reader: &delegatingReader{
			CacheReader:       in.CacheReader,
			ClientReader:      in.Client,
			scheme:            in.Client.Scheme(),
			uncachedGVKs:      uncachedGVKs,
			cacheUnstructured: in.CacheUnstructured,
		},
		Writer:       in.Client,
		StatusClient: in.Client,
	}, nil
}

上面可以看到writeObj最后返回的是Client接口,在Build方法里调用了NewDelegatingClient函数,通过返回的delegatingClient实现的读写分离。读的时候使用的是cache,写的时候使用的是client

Controller

1
2
3
4
5
6
7
8
	if err = (&controllers.CronJobReconciler{
		Client: mgr.GetClient(),
		Log: ctrl.Log.WithName("controllers").WithName("CronJob"),
		Scheme: mgr.GetScheme(),
	}).SetupWithManager(mgr); err != nil {
		setupLog.Error(err, "unable to create controller", "controller", "CronJob")
		os.Exit(1)
	}

主要初始化了controllers的结构体,然后调用了SetupWithManager方法

1
2
3
4
5
6
7
// SetupWithManager sets up the controller with the Manager.
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&batchv1.CronJob{}).
		//Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}).
		Complete(r)
}

SetupWithManager之前有讲到过,主要是使用了建造者模式,去构建了我们需要监听的对象,只有这些对象的相关事件才会触发我们的 Reconcile 逻辑。这里面的 Complete 最后其实是调用了 Build 方法。其中NewControllerManagedBy是一个建造者模式,返回的是一个builder对象,其包含了用于构建的ForOwnsWatchesWithEventFilter等方法。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Complete builds the Application ControllerManagedBy.
func (blder *Builder) Complete(r reconcile.Reconciler) error {
	_, err := blder.Build(r)
	return err
}

// Build builds the Application ControllerManagedBy and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
	...

	// Set the Config
	blder.loadRestConfig()

	// Set the ControllerManagedBy
	if err := blder.doController(r); err != nil {
		return nil, err
	}

	// Set the Watch
	if err := blder.doWatch(); err != nil {
		return nil, err
	}

	return blder.ctrl, nil
}

Build方法主要调用doController、doWatch两个方法

doController

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (blder *Builder) doController(r reconcile.Reconciler) error {
	ctrlOptions := blder.ctrlOptions
	if ctrlOptions.Reconciler == nil {
		ctrlOptions.Reconciler = r
	}

	// Retrieve the GVK from the object we're reconciling
	// to prepopulate logger information, and to optionally generate a default name.
	gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
	if err != nil {
		return err
	}

	// Setup the logger.
	if ctrlOptions.Log == nil {
		ctrlOptions.Log = blder.mgr.GetLogger()
	}
	ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)

	// Build the controller and return.
	blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
	return err
}

doController主要是初始化了一个Controller,这里面传入了我们实现的Reconciler以及获取到我们的GVK的名称

doWatch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
func (blder *Builder) doWatch() error {
	// Reconcile type
	typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
	if err != nil {
		return err
	}
	src := &source.Kind{Type: typeForSrc}
	hdler := &handler.EnqueueRequestForObject{}
	allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
	//blder.ctr.Watch,会往src添加cache接口,调用的是mgr接口(controllerManager结构体)的SetFields,这里面又调用了Cluster接口(cluster结构体)的SetFields方法
	if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
		return err
	}

	// Watches the managed types
	for _, own := range blder.ownsInput {
		typeForSrc, err := blder.project(own.object, own.objectProjection)
		if err != nil {
			return err
		}
		src := &source.Kind{Type: typeForSrc}
		hdler := &handler.EnqueueRequestForOwner{
			OwnerType:    blder.forInput.object,
			IsController: true,
		}
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, own.predicates...)
		if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
			return err
		}
	}

	// Do the watch requests
	for _, w := range blder.watchesInput {
		allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
		allPredicates = append(allPredicates, w.predicates...)

		// If the source of this watch is of type *source.Kind, project it.
		if srckind, ok := w.src.(*source.Kind); ok {
			typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
			if err != nil {
				return err
			}
			srckind.Type = typeForSrc
		}

		if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
			return err
		}
	}
	return nil
}

Watch 主要是监听我们想要的资源变化,blder.ctrl.Watch(src, hdler, allPredicates…)通过过滤源事件的变化,allPredicates是过滤器,只有所有的过滤器都返回 true 时,才会将事件传递给 EventHandler hdler,这里会将 Handler 注册到 Informer 上

启动

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
func (cm *controllerManager) Start(ctx context.Context) (err error) {
	if err := cm.Add(cm.cluster); err != nil {
		return fmt.Errorf("failed to add cluster to runnables: %w", err)
	}
	cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

	// 用来退出所有协程
	stopComplete := make(chan struct{})
	defer close(stopComplete)
	...
	//用来保存错误
	cm.errChan = make(chan error)

	
	// 如果需要metric,就启动metric服务
	if cm.metricsListener != nil {
		go cm.serveMetrics()
	}

	// 启动健康检查服务
	if cm.healthProbeListener != nil {
		go cm.serveHealthProbes()
	}

	//选举相关
	go cm.startNonLeaderElectionRunnables()

	go func() {
		if cm.resourceLock != nil {
			err := cm.startLeaderElection()
			if err != nil {
				cm.errChan <- err
			}
		} else {
			// Treat not having leader election enabled the same as being elected.
			cm.startLeaderElectionRunnables()
			close(cm.elected)
		}
	}()

	//判断是否需要退出
	select {
	case <-ctx.Done():
		// We are done
		return nil
	case err := <-cm.errChan:
		// Error starting or running a runnable
		return err
	}
}

无论是不是 leader 最后都会使用 startRunnable 启动 Controller,实际上都是调用了Controller的Start方法

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {
	// Controller只能被执行一次
	c.mu.Lock()
	if c.Started {
		return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
	}

	c.initMetrics()

	// Set the internal context.
	c.ctx = ctx
	// 获取队列
	c.Queue = c.MakeQueue()
	defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed

	err := func() error {
		defer c.mu.Unlock()

		
		defer utilruntime.HandleCrash()

		
		// 尝试等待缓存
		for _, watch := range c.startWatches {
			c.Log.Info("Starting EventSource", "source", watch.src)

			if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
				return err
			}
		}

		// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
		c.Log.Info("Starting Controller")
		
		for _, watch := range c.startWatches {
			syncingSource, ok := watch.src.(source.SyncingSource)
			if !ok {
				continue
			}

			if err := func() error {
				// use a context with timeout for launching sources and syncing caches.
				sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
				defer cancel()

				// WaitForSync waits for a definitive timeout, and returns if there
				// is an error or a timeout
				if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
					err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
					c.Log.Error(err, "Could not wait for Cache to sync")
					return err
				}

				return nil
			}(); err != nil {
				return err
			}
		}

		// All the watches have been started, we can reset the local slice.
		//
		// We should never hold watches more than necessary, each watch source can hold a backing cache,
		// which won't be garbage collected if we hold a reference to it.
		c.startWatches = nil

		if c.JitterPeriod == 0 {
			c.JitterPeriod = 1 * time.Second
		}

		// Launch workers to process resources
		c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
		for i := 0; i < c.MaxConcurrentReconciles; i++ {
			go wait.UntilWithContext(ctx, func(ctx context.Context) {
				// 查询队列中有没有关注的事件,有的话就触发我们的 reconcile 逻辑 
				for c.processNextWorkItem(ctx) {
				}
			}, c.JitterPeriod)
		}

		c.Started = true
		return nil
	}()
	if err != nil {
		return err
	}

	<-ctx.Done()
	c.Log.Info("Stopping workers")
	return nil
}

总结

Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。

Kubebuilder 是一个非常好用的 Operator 开发框架,不仅极大的简化了 Operator 的开发过程,并且充分的利用了 go interface 的特性留下了足够的扩展性。