控制器模式
kubernetes作为一个“容器编排”平台,其核心的功能是编排,Pod作为K8s调度的最小单位,具备很多属性和字段,k8s编排正是通过一个个控制器根据被控制对象的属性和字段来实现。
例如
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
apiVersion: apps/v1
kind: Deployment
metadata:
name: test
spec:
selector:
matchLabels:
app: test
replicas: 2
template:
metadata:
labels:
app: test
spec:
containers:
- name: nginx
image: nginx:1.7.9
ports:
- containerPort: 80
|
K8s集群在部署时包含了Controllers组件,里面对于每个build-in的资源类型(比如Deployment、Statefulset、Cronjob…)都有对应的Controller,基本是1:1的关系。上面的例子中,Deployment 资源创建之后,对应的 Deployment Controller 编排动作很简单,确保属于当前deployment资源的 Pod 个数永远等于 2,Pod 由 template 部分定义,具体来说,K8s 里面是 kube-controller-manager 这个组件在做这件事,可以看下 K8s 项目的 pkg/controller 目录,里面包含了所有控制器,都以独有的方式负责某种编排功能,但是它们都遵循一个通用编排模式,即:调谐循环(Reconcile loop),其伪代码逻辑为:
1
2
3
4
5
6
7
8
9
|
for {
actualState := GetResourceActualState(rsvc)
expectState := GetResourceExpectState(rsvc)
if actualState == expectState {
// do nothing
} else {
Reconcile(rsvc)
}
}
|
就是一个无限循环(实际是事件驱动 定时同步来实现,不是无脑循环)不断地对比期望状态和实际状态,如果有出入则进行 Reconcile(调谐)逻辑将实际状态调整为期望状态。期望状态就是我们的对象定义(通常是 YAML 文件),实际状态是集群里面当前的运行状态(通常来自于 K8s 集群内外相关资源的状态汇总),控制器的编排逻辑主要是第三步做的,这个操作被称为调谐(Reconcile),整个控制器调谐的过程称为“Reconcile Loop”,调谐的最终结果一般是对被控制对象的某种写操作,比如增/删/改 Pod。
在控制器中定义被控制对象是通过“模板”完成的,比如 Deployment 里面的 template 字段里的内容跟一个标准的 Pod 对象的 API 定义一样,所有被这个 Deployment 管理的 Pod 实例,都是根据这个 template 字段的创建的,这就是 PodTemplate,一个控制对象的定义一般是由上半部分的控制定义(期望状态),加上下半部分的被控制对象的模板组成。
声明式API
声明式API就是“告诉K8s你要做什么,而不是告诉它怎么做的命令”,一个很熟悉的例子就是 SQL,你“告诉 DB 根据条件和各类算子返回数据,而不是告诉它怎么遍历,过滤,聚合”。在 K8s 里面,声明式的体现就是 kubectl apply 命令,在对象创建和后续更新中一直使用相同的 apply 命令,告诉 K8s 对象的终态即可,底层是通过执行了一个对原有 API 对象的 PATCH 操作来实现的,可以一次性处理多个写操作,具备 Merge 能力 diff 出最终的 PATCH,而命令式一次只能处理一个写请求。
声明式 API 让 K8s 的“容器编排”世界看起来温柔美好,而控制器(以及容器运行时,存储,网络模型等)才是这太平盛世的幕后英雄。说到这里,就会有人希望也能像 build-in 资源一样构建自己的自定义资源(CRD-Customize Resource Definition),然后为自定义资源写一个对应的控制器,推出自己的声明式 API。K8s 提供了 CRD 的扩展方式来满足用户这一需求,而且由于这种扩展方式十分灵活,在最新的 1.15 版本对 CRD 做了相当大的增强。对于用户来说,实现 CRD 扩展主要做两件事:
Kubebuilder 就是帮我们简化这两件事的工具,现在我们开始介绍主角。
kubebuilder
摘要
Kubebuilder 是一个使用 CRDs 构建 K8s API 的 SDK,主要是:
- 提供脚手架工具初始化 CRDs 工程,自动生成 boilerplate 代码和配置;
- 提供代码库封装底层的 K8s go-client;
方便用户从零开始开发 CRDs,Controllers 和 Admission Webhooks 来扩展 K8s。
核心概念
GVKs & GVRs
- GVK = GroupVersionKind
- GVR = GroupVersionResource
API Group & Versions (GV)
API Group是相关API功能的集合,每个Group拥有一个或多个Versions,用于接口的演进
Kinds & Resources
每个 GV 都包含多个 API 类型,称为 Kinds,在不同的 Versions 之间同一个 Kind 定义可能不同, Resource 是 Kind 的对象标识(resource type),一般来说 Kinds 和 Resources 是 1:1 的,比如 pods Resource 对应 Pod Kind,但是有时候相同的 Kind 可能对应多个 Resources,比如 Scale Kind 可能对应很多 Resources:deployments/scale,replicasets/scale,对于 CRD 来说,只会是 1:1 的关系。
每一个 GVK 都关联着一个 package 中给定的 root Go type,比如 apps/v1/Deployment 就关联着 K8s 源码里面 k8s.io/api/apps/v1 package 中的 Deployment struct,我们提交的各类资源定义 YAML 文件都需要写:
- apiVersion: 这个就是GV
- kind:这个就是K
根据 GVK K8s 就能找到你到底要创建什么类型的资源,根据你定义的 Spec 创建好资源之后就成为了 Resource,也就是 GVR。GVK/GVR 就是 K8s 资源的坐标,是我们创建/删除/修改/读取资源的基础。
Scheme
每一组 Controllers 都需要一个 Scheme,提供了 Kinds 与对应 Go types 的映射,也就是说给定 Go type 就知道他的 GVK,给定 GVK 就知道他的 Go type,比如说我们给定一个 Scheme: “tutotial.kubebuilder.io/api/v1”.CronJob{} 这个 Go type 映射到 batch.tutotial.kubebuilder.io/v1 的 CronJob GVK,那么从 Api Server 获取到下面的 JSON:
1
2
3
4
5
|
{
"kind": "CronJob",
"apiVersion": "batch.tutorial.kubebuilder.io/v1",
...
}
|
就能构造出对应的 Go type了,通过这个 Go type 也能正确地获取 GVR 的一些信息,控制器可以通过该 Go type 获取到期望状态以及其他辅助信息进行调谐逻辑。
核心架构
Process
process进程通过main.go启动,一般来说一个Controller只有一个进程,如果做了高可用的话,会有多个
Manager
每个进程会有一个Manager,这是核心组件,主要负责
- metrics的暴露
- webhook证书
- 初始化共享的cache
- 初始化共享的clients用于和APIServer进行通信
- 所有的Controller的运行
Client
一般来说,我们创建、更新、删除某个资源的时候会直接调用Client和APIServer进行通信
Cache
cache负责同步Controller关心的资源,其核心是GVK -> Informer 的映射,一般我们的Get 和 List 操作都会从Cache中获取数据。Informer 会负责监听对应 GVK 的 GVRs 的创建/删除/更新操作,以触发 Controller 的 Reconcile 逻辑。
Controller
为控制器的业务逻辑所载的地方,一个Manager可能会有多个Controller,我们一般只需要实现Reconcile方法就行。图上的Predicate是事件过滤器,我们可以再Controller中过滤掉我们不关心的事件信息。
WebHook
就是我们准入控制实现的地方了,主要有2类接口:
- 一个是MutatingAdmissionWebhook需要实现Defaulter接口
- 一个是ValidatingAdmissionWebhook需要实现Validator接口
其他
Index
由于 Controller 经常要对 Cache 进行查询,Kubebuilder 提供 Index utility 给 Cache 加索引提升查询效率。
Finalizer
在一般情况下,如果资源被删除之后,我们虽然能够被触发删除事件,但是这个时候从 Cache 里面无法读取任何被删除对象的信息,这样一来,导致很多垃圾清理工作因为信息不足无法进行,K8s 的 Finalizer 字段用于处理这种情况。在 K8s 中,只要对象 ObjectMeta 里面的 Finalizers 不为空,对该对象的 delete 操作就会转变为 update 操作,具体说就是 update deletionTimestamp 字段,其意义就是告诉 K8s 的 GC“在deletionTimestamp 这个时刻之后,只要 Finalizers 为空,就立马删除掉该对象”。
所以一般的使用姿势就是在创建对象时把 Finalizers 设置好(任意 string),然后处理 DeletionTimestamp 不为空的 update 操作(实际是 delete),根据 Finalizers 的值执行完所有的 pre-delete hook(此时可以在 Cache 里面读取到被删除对象的任何信息)之后将 Finalizers 置为空即可。
OwnerReference
K8s GC 在删除一个对象时,任何 ownerReference 是该对象的对象都会被清除,与此同时,Kubebuidler 支持所有对象的变更都会触发 Owner 对象 controller 的 Reconcile 方法。
kubebuilder的使用
创建脚手架工程
1
|
[root@kubebuilder CronJob]# kubebuilder init --domain xieys.io --repo xieys.io
|
这一步创建了一个Go module工程,引入了必要的依赖,创建了一些模板文件
go.mod
基本依赖项
Makefile
为构建和部署控制器指定目标
PROJECT
kubebuilder元数据脚手架新组建
config/
启动配置,它只包含在集群上启动控制器所需的Kustomize YAML定义,它还将保存我们的CustomResourceDefinitions、RBAC配置和WebhookConfigurations。
config/default/
包含一个Kustomize基础,用于在标准配置中启动控制器。
config/manager
在集群内启动你的控制器相关的yaml定义
config/rbac
在自己的服务帐户下运行控制器所需的权限相关的yaml定义
创建API
1
|
[root@kubebuilder CronJob]# kubebuilder create api --group batch --version v1 --kind CronJob
|
这一步创建了对应的CRD和Controller模板文件,经过这两步,现有的工程结构图如下:
定义CRD
在Kubebuilder生成的工程结构图中的cronjob_types.go文件里定义Spec 和 Status。
编写Controller逻辑
在Kubebuilder生成的工程结构图中的cronjob_controller.go 文件中实现Reconcile的逻辑。
测试发布
本地测试完之后使用kubebuilder的Makefile构建镜像,部署我们的CRDs和Controller即可
kubebuilder深入
问题:
- 如果同步自定义资源以及k8s build-in资源
- Controller的Reconcile方法是如何被触发的?
- Cache的工作原理是什么?
源码阅读
从main.go开始
kubebuilder创建的main.go是整个项目的入口,逻辑十分简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(batchv1.AddToScheme(scheme))
//+kubebuilder:scaffold:scheme
}
func main() {
...
//1、初始化Manager
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
Port: 9443,
HealthProbeBindAddress: probeAddr,
LeaderElection: enableLeaderElection,
LeaderElectionID: "b6d835ad.xieys.io",
//CertDir: "config/cert", //手动指定证书位置用于测试
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
//2、初始化Reconciler(Controller)
if err = (&controllers.CronJobReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("CronJob"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CronJob")
os.Exit(1)
}
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
// 启动Manager
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
|
可以看到在init方法里面我们将batchv1注册到Scheme里面去了,这样一来Cache就知道该watch谁了,main方法里面的逻辑基本都是manager的:
- 初始化一个Manager
- 将Manager的Client传给Controller,并且调用SetupWithManager方法传入Manager进行Controller的初始化
- 启动webhook(上述代码没有添加webhook)
- 添加健康检查
- 启动Manager
下面我们就顺着main函数里面的逻辑一步步往下看
NewManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
// NewManager returns a new Manager for creating Controllers.
NewManager = manager.New
// New returns a new Manager for creating Controllers.
func New(config *rest.Config, options Options) (Manager, error) {
// Set default values for options fields
options = setOptionsDefaults(options)
//初始化集群,创建cache以及client
cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) {
clusterOptions.Scheme = options.Scheme
clusterOptions.MapperProvider = options.MapperProvider
clusterOptions.Logger = options.Logger
clusterOptions.SyncPeriod = options.SyncPeriod
clusterOptions.Namespace = options.Namespace
clusterOptions.NewCache = options.NewCache
clusterOptions.ClientBuilder = options.ClientBuilder
clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor
clusterOptions.DryRunClient = options.DryRunClient
clusterOptions.EventBroadcaster = options.EventBroadcaster
})
if err != nil {
return nil, err
}
//创建事件记录器
recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster)
if err != nil {
return nil, err
}
// 需要高可用的话,创建选举相关的配置
leaderConfig := options.LeaderElectionConfig
if leaderConfig == nil {
leaderConfig = rest.CopyConfig(config)
}
resourceLock, err := options.newResourceLock(leaderConfig, recorderProvider, leaderelection.Options{
LeaderElection: options.LeaderElection,
LeaderElectionResourceLock: options.LeaderElectionResourceLock,
LeaderElectionID: options.LeaderElectionID,
LeaderElectionNamespace: options.LeaderElectionNamespace,
})
if err != nil {
return nil, err
}
//创建metric和健康检查的接口
metricsListener, err := options.newMetricsListener(options.MetricsBindAddress)
if err != nil {
return nil, err
}
// By default we have no extra endpoints to expose on metrics http server.
metricsExtraHandlers := make(map[string]http.Handler)
// Create health probes listener. This will throw an error if the bind
// address is invalid or already in use.
healthProbeListener, err := options.newHealthProbeListener(options.HealthProbeBindAddress)
if err != nil {
return nil, err
}
//最后将这些配置放到manager中
return &controllerManager{
cluster: cluster,
recorderProvider: recorderProvider,
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
host: options.Host,
certDir: options.CertDir,
leaseDuration: *options.LeaseDuration,
renewDeadline: *options.RenewDeadline,
retryPeriod: *options.RetryPeriod,
healthProbeListener: healthProbeListener,
readinessEndpointName: options.ReadinessEndpointName,
livenessEndpointName: options.LivenessEndpointName,
gracefulShutdownTimeout: *options.GracefulShutdownTimeout,
internalProceduresStop: make(chan struct{}),
leaderElectionStopped: make(chan struct{}),
}, nil
}
|
cluster.New(config, func(clusterOptions *cluster.Options)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
|
// New constructs a brand new cluster
func New(config *rest.Config, opts ...Option) (Cluster, error) {
...
options := Options{}
// 为options赋值,上面以函数传入进来的
for _, opt := range opts {
opt(&options)
}
// 为options设置默认的值
options = setOptionsDefaults(options)
// 创建mapper映射器, k8s restAPI与go type的转换器
mapper, err := options.MapperProvider(config)
if err != nil {
options.Logger.Error(err, "Failed to get API Group-Resources")
return nil, err
}
// 创建cache
cache, err := options.NewCache(config,
cache.Options{
Scheme: options.Scheme, //main中传入的scheme
Mapper: mapper, //k8s api和gotype的转换器
Resync: options.SyncPeriod, //默认10小时,一般不要改
Namespace: options.Namespace,//需要监听的namespace
})
if err != nil {
return nil, err
}
//创建client的options选项
clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper}
//用于查询kubernetes资源,但是会跳过cache,直接从apiserver中查询
apiReader, err := client.New(config, clientOptions)
if err != nil {
return nil, err
}
//初始化用于写操作的client
writeObj, err := options.ClientBuilder.
WithUncached(options.ClientDisableCacheFor...).
Build(cache, config, clientOptions)
if err != nil {
return nil, err
}
if options.DryRunClient {
writeObj = client.NewDryRunClient(writeObj)
}
//创建事件记录器
recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster)
if err != nil {
return nil, err
}
return &cluster{
config: config,
scheme: options.Scheme,
cache: cache,
fieldIndexes: cache,
client: writeObj,
apiReader: apiReader,
recorderProvider: recorderProvider,
mapper: mapper,
logger: options.Logger,
}, nil
}
|
创建Cache
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
可以看到在为options设置默认的值的时候,有把New函数赋值给了options.NewCache
func setOptionsDefaults(options Options) Options {
...
if options.NewCache == nil {
options.NewCache = cache.New
}
...
return options
}
// New initializes and returns a new Cache.
func New(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
return &informerCache{InformersMap: im}, nil
}
|
这里主要是调用NewInformersMap方法创建Informer的映射
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
// NewInformersMap creates a new InformersMap that can create informers for
// both structured and unstructured objects.
func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
Scheme: scheme,
}
}
|
NewInformersMap会分别创建,结构化、非结构化以及metadata的informerMap,而这些方法最后都会调用newSpecificlnformersMap方法,区别就是不同的方法传入的createListWatcherFunc参数不同
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
}
// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
}
// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
}
func newSpecificInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
}
return ip
}
|
newSpecificInformersMap和常规的InformersMap类似,区别是没实现WaitForCacheSync方法。
以结构化的传入的createStructuredListWatch为例,主要是返回一个用于创建SharedIndexInformer 的 ListWatch 对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
// groupVersionKind to the Resource API we will use.
mapping, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, err
}
client, err := apiutil.RESTClientForGVK(gvk, false, ip.config, ip.codecs)
if err != nil {
return nil, err
}
listGVK := gvk.GroupVersion().WithKind(gvk.Kind + "List")
listObj, err := ip.Scheme.New(listGVK)
if err != nil {
return nil, err
}
// TODO: the functions that make use of this ListWatch should be adapted to
// pass in their own contexts instead of relying on this fixed one here.
ctx := context.TODO()
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
return client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Watch(ctx)
},
}, nil
}
|
cache主要是创建了一些InformerMap,完成了GVK到Informer的映射,每个Informer会根据ListWatch函数对对应的GVK进行List和Watch。
创建Client
apiReader
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
apiReader, err := client.New(config, clientOptions)
//New使用提供的配置和选项返回一个新的客户端。返回的客户端直接从服务器读取*和*写入(它不使用对象缓存)。它理解如何使用普通类型(自定义资源和聚合/内置资源),以及非结构化类型。对于普通类型,该模式将用于查找给定类型的对应组、版本和类型。在非结构化类型的情况下,将从对象的相应字段中提取组、版本和类型。
func New(config *rest.Config, options Options) (Client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}
// Init a scheme if none provided
if options.Scheme == nil {
options.Scheme = scheme.Scheme
}
// Init a Mapper if none provided
if options.Mapper == nil {
var err error
options.Mapper, err = apiutil.NewDynamicRESTMapper(config)
if err != nil {
return nil, err
}
}
clientcache := &clientCache{
config: config,
scheme: options.Scheme,
mapper: options.Mapper,
codecs: serializer.NewCodecFactory(options.Scheme),
structuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
unstructuredResourceByType: make(map[schema.GroupVersionKind]*resourceMeta),
}
rawMetaClient, err := metadata.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("unable to construct metadata-only client for use as part of client: %w", err)
}
c := &client{
typedClient: typedClient{
cache: clientcache,
paramCodec: runtime.NewParameterCodec(options.Scheme),
},
unstructuredClient: unstructuredClient{
cache: clientcache,
paramCodec: noConversionParamCodec{},
},
metadataClient: metadataClient{
client: rawMetaClient,
restMapper: options.Mapper,
},
scheme: options.Scheme,
mapper: options.Mapper,
}
return c, nil
}
|
writeObj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
writeObj, err := options.ClientBuilder.
WithUncached(options.ClientDisableCacheFor...).
Build(cache, config, clientOptions)
options设置默认值的时候,有个ClientBuilder接口初始化
// Allow the client builder to be mocked
if options.ClientBuilder == nil {
options.ClientBuilder = NewClientBuilder()
}
// NewClientBuilder returns a builder to build new clients to be passed when creating a Manager.
func NewClientBuilder() ClientBuilder {
return &newClientBuilder{}
}
type newClientBuilder struct {
uncached []client.Object
}
func (n *newClientBuilder) WithUncached(objs ...client.Object) ClientBuilder {
n.uncached = append(n.uncached, objs...)
return n
}
//重点看Build方法,这里实现了client的读写分离
func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) {
// Create the Client for Write operations.
c, err := client.New(config, options)
if err != nil {
return nil, err
}
return client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cache,
Client: c,
UncachedObjects: n.uncached,
})
}
func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
uncachedGVKs := map[schema.GroupVersionKind]struct{}{}
for _, obj := range in.UncachedObjects {
gvk, err := apiutil.GVKForObject(obj, in.Client.Scheme())
if err != nil {
return nil, err
}
uncachedGVKs[gvk] = struct{}{}
}
return &delegatingClient{
scheme: in.Client.Scheme(),
mapper: in.Client.RESTMapper(),
Reader: &delegatingReader{
CacheReader: in.CacheReader,
ClientReader: in.Client,
scheme: in.Client.Scheme(),
uncachedGVKs: uncachedGVKs,
cacheUnstructured: in.CacheUnstructured,
},
Writer: in.Client,
StatusClient: in.Client,
}, nil
}
|
上面可以看到writeObj最后返回的是Client接口,在Build方法里调用了NewDelegatingClient函数,通过返回的delegatingClient实现的读写分离。读的时候使用的是cache,写的时候使用的是client
Controller
1
2
3
4
5
6
7
8
|
if err = (&controllers.CronJobReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("CronJob"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "CronJob")
os.Exit(1)
}
|
主要初始化了controllers的结构体,然后调用了SetupWithManager方法
1
2
3
4
5
6
7
|
// SetupWithManager sets up the controller with the Manager.
func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&batchv1.CronJob{}).
//Watches(&source.Kind{Type: &corev1.Node{}}, handler.Funcs{UpdateFunc: r.nodeUpdateHandler}).
Complete(r)
}
|
SetupWithManager之前有讲到过,主要是使用了建造者模式,去构建了我们需要监听的对象,只有这些对象的相关事件才会触发我们的 Reconcile 逻辑。这里面的 Complete 最后其实是调用了 Build 方法。其中NewControllerManagedBy是一个建造者模式,返回的是一个builder对象,其包含了用于构建的For
、Owns
、Watches
、WithEventFilter
等方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
// Complete builds the Application ControllerManagedBy.
func (blder *Builder) Complete(r reconcile.Reconciler) error {
_, err := blder.Build(r)
return err
}
// Build builds the Application ControllerManagedBy and returns the Controller it created.
func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, error) {
...
// Set the Config
blder.loadRestConfig()
// Set the ControllerManagedBy
if err := blder.doController(r); err != nil {
return nil, err
}
// Set the Watch
if err := blder.doWatch(); err != nil {
return nil, err
}
return blder.ctrl, nil
}
|
Build方法主要调用doController、doWatch两个方法
doController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
func (blder *Builder) doController(r reconcile.Reconciler) error {
ctrlOptions := blder.ctrlOptions
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
}
// Retrieve the GVK from the object we're reconciling
// to prepopulate logger information, and to optionally generate a default name.
gvk, err := getGvk(blder.forInput.object, blder.mgr.GetScheme())
if err != nil {
return err
}
// Setup the logger.
if ctrlOptions.Log == nil {
ctrlOptions.Log = blder.mgr.GetLogger()
}
ctrlOptions.Log = ctrlOptions.Log.WithValues("reconciler group", gvk.Group, "reconciler kind", gvk.Kind)
// Build the controller and return.
blder.ctrl, err = newController(blder.getControllerName(gvk), blder.mgr, ctrlOptions)
return err
}
|
doController主要是初始化了一个Controller,这里面传入了我们实现的Reconciler以及获取到我们的GVK的名称
doWatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
|
func (blder *Builder) doWatch() error {
// Reconcile type
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
//blder.ctr.Watch,会往src添加cache接口,调用的是mgr接口(controllerManager结构体)的SetFields,这里面又调用了Cluster接口(cluster结构体)的SetFields方法
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
// Watches the managed types
for _, own := range blder.ownsInput {
typeForSrc, err := blder.project(own.object, own.objectProjection)
if err != nil {
return err
}
src := &source.Kind{Type: typeForSrc}
hdler := &handler.EnqueueRequestForOwner{
OwnerType: blder.forInput.object,
IsController: true,
}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
return err
}
}
// Do the watch requests
for _, w := range blder.watchesInput {
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
// If the source of this watch is of type *source.Kind, project it.
if srckind, ok := w.src.(*source.Kind); ok {
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
if err != nil {
return err
}
srckind.Type = typeForSrc
}
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {
return err
}
}
return nil
}
|
Watch 主要是监听我们想要的资源变化,blder.ctrl.Watch(src, hdler, allPredicates…)通过过滤源事件的变化,allPredicates是过滤器,只有所有的过滤器都返回 true 时,才会将事件传递给 EventHandler hdler,这里会将 Handler 注册到 Informer 上
启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
func (cm *controllerManager) Start(ctx context.Context) (err error) {
if err := cm.Add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)
// 用来退出所有协程
stopComplete := make(chan struct{})
defer close(stopComplete)
...
//用来保存错误
cm.errChan = make(chan error)
// 如果需要metric,就启动metric服务
if cm.metricsListener != nil {
go cm.serveMetrics()
}
// 启动健康检查服务
if cm.healthProbeListener != nil {
go cm.serveHealthProbes()
}
//选举相关
go cm.startNonLeaderElectionRunnables()
go func() {
if cm.resourceLock != nil {
err := cm.startLeaderElection()
if err != nil {
cm.errChan <- err
}
} else {
// Treat not having leader election enabled the same as being elected.
cm.startLeaderElectionRunnables()
close(cm.elected)
}
}()
//判断是否需要退出
select {
case <-ctx.Done():
// We are done
return nil
case err := <-cm.errChan:
// Error starting or running a runnable
return err
}
}
|
无论是不是 leader 最后都会使用 startRunnable 启动 Controller,实际上都是调用了Controller的Start方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
// Start implements controller.Controller
func (c *Controller) Start(ctx context.Context) error {
// Controller只能被执行一次
c.mu.Lock()
if c.Started {
return errors.New("controller was started more than once. This is likely to be caused by being added to a manager multiple times")
}
c.initMetrics()
// Set the internal context.
c.ctx = ctx
// 获取队列
c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed
err := func() error {
defer c.mu.Unlock()
defer utilruntime.HandleCrash()
// 尝试等待缓存
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.Log.Info("Starting Controller")
for _, watch := range c.startWatches {
syncingSource, ok := watch.src.(source.SyncingSource)
if !ok {
continue
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
return err
}
return nil
}(); err != nil {
return err
}
}
// All the watches have been started, we can reset the local slice.
//
// We should never hold watches more than necessary, each watch source can hold a backing cache,
// which won't be garbage collected if we hold a reference to it.
c.startWatches = nil
if c.JitterPeriod == 0 {
c.JitterPeriod = 1 * time.Second
}
// Launch workers to process resources
c.Log.Info("Starting workers", "worker count", c.MaxConcurrentReconciles)
for i := 0; i < c.MaxConcurrentReconciles; i++ {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
// 查询队列中有没有关注的事件,有的话就触发我们的 reconcile 逻辑
for c.processNextWorkItem(ctx) {
}
}, c.JitterPeriod)
}
c.Started = true
return nil
}()
if err != nil {
return err
}
<-ctx.Done()
c.Log.Info("Stopping workers")
return nil
}
|
总结
Reconcile 方法的触发是通过 Cache 中的 Informer 获取到资源的变更事件,然后再通过生产者消费者的模式触发我们自己实现的 Reconcile 方法的。
Kubebuilder 是一个非常好用的 Operator 开发框架,不仅极大的简化了 Operator 的开发过程,并且充分的利用了 go interface 的特性留下了足够的扩展性。