1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
func NewMainKubelet(...) (*Kubelet, error) {
...
if kubeDeps.PodConfig == nil {
var err error
//// 初始化 makePodSourceConfig,监听 pod 元数据的来源(FILE, URL, api-server),将不同 source 的 pod configuration 合并到一个结构中
kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
if err != nil {
return nil, err
}
}
//containerGc策略设置
containerGCPolicy := kubecontainer.ContainerGCPolicy{
MinAge: minimumGCAge.Duration,
MaxPerPodContainer: int(maxPerPodContainerCount),
MaxContainers: int(maxContainerCount),
}
// kubelet 服务端口,默认 10250
daemonEndpoints := &v1.NodeDaemonEndpoints{
KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
}
//imageGC策略设置
imageGCPolicy := images.ImageGCPolicy{
MinAge: kubeCfg.ImageMinimumGCAge.Duration,
HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent),
}
//设置预留资源
enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
if experimentalNodeAllocatableIgnoreEvictionThreshold {
// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
enforceNodeAllocatable = []string{}
}
//node资源不足时的驱逐策略设置
thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
if err != nil {
return nil, err
}
evictionConfig := eviction.Config{
PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
Thresholds: thresholds,
KernelMemcgNotification: experimentalKernelMemcgNotification,
PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(),
}
// 使用 reflector 把 ListWatch 得到的服务信息实时同步到 serviceStore 对象中
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
if kubeDeps.KubeClient != nil {
serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
go r.Run(wait.NeverStop)
}
serviceLister := corelisters.NewServiceLister(serviceIndexer)
// 使用 reflector 把 ListWatch 得到的节点信息实时同步到 nodeStore 对象中
nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if kubeDeps.KubeClient != nil {
fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
go r.Run(wait.NeverStop)
}
nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: string(nodeName),
UID: types.UID(nodeName),
Namespace: "",
}
// 容器引用的管理
containerRefManager := kubecontainer.NewRefManager()
// oom监控
oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)
...
//根据配置信息和各种对象创建kubelet实例
klet := &Kubelet{
hostname: hostname,
hostnameOverridden: len(hostnameOverride) > 0,
nodeName: nodeName,
...
}
...
//创建secretManager和configMapManager
var secretManager secret.Manager
var configMapManager configmap.Manager
switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
case kubeletconfiginternal.WatchChangeDetectionStrategy:
secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
secretManager = secret.NewCachingSecretManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
configMapManager = configmap.NewCachingConfigMapManager(
kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
case kubeletconfiginternal.GetChangeDetectionStrategy:
secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
default:
return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
}
klet.secretManager = secretManager
klet.configMapManager = configMapManager
if klet.experimentalHostUserNamespaceDefaulting {
klog.Infof("Experimental host user namespace defaulting is enabled.")
}
// 从cAdvisor 中获取当前机器的信息
machineInfo, err := klet.cadvisor.MachineInfo()
if err != nil {
return nil, err
}
klet.machineInfo = machineInfo
imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
//创建livenessManager
klet.livenessManager = proberesults.NewManager()
//创建一个pod缓存
klet.podCache = kubecontainer.NewCache()
//创建一个checkpointManager
var checkpointManager checkpointmanager.CheckpointManager
if bootstrapCheckpointPath != "" {
checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
}
}
// 创建podManager,对pod的管理(如:增删改),还负责保持secretManager和configMapManager以及checkpointManager内容的更新。
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
//statusManager 实时检测节点上 pod 的状态,并更新到 apiserver 对应的 pod
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
...
//创建一个KubeGenericRuntimeManager(容器运行时管理)
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(...)
...
//pleg
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
...
// 创建containerGC对象,进行周期性的容器清理工作
containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
...
// 创建imageManager管理镜像
imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
...
//探针管理
klet.probeManager = prober.NewManager(
klet.statusManager,
klet.livenessManager,
klet.runner,
containerRefManager,
kubeDeps.Recorder)
//token管理
tokenManager := token.NewManager(kubeDeps.KubeClient)
...
//磁盘管理
klet.volumeManager = volumemanager.NewVolumeManager(...)
...
// 容器驱逐策略管理
evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)
klet.evictionManager = evictionManager
...
return klet, nil
}
|