目录

kubelet启动流程

kubelet启动流程

http://xieys.club/images/posts/kubelet-3.png

kubelet入口函数main(cmd/kubelet/kubelet.go)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewKubeletCommand()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

初始化kubelet配置(cmd/kubelet/app/server.go)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
	cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError)
	cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
	//kubelet配置分2部分:
	//kubeletFlag:指那些不允许在kubelet运行时进行修改的配置集,或者不能在集群中各个Nodes之间共享的配置集。
	kubeletFlags := options.NewKubeletFlags()
	//KubeletConfiguration:指可以在集群中各个nodes之间共享的配置集,可以进行动态配置。
	kubeletConfig, err := options.NewKubeletConfiguration()
	...
	cmd := &cobra.Command{
		...
		Run: func(cmd *cobra.Command, args []string) {
			...

			// 如果有提供配置文件的话,加载kubelet配置文件
			if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 {
				kubeletConfig, err = loadConfigFile(configFile)
				if err != nil {
					klog.Fatal(err)
				}
				//如果命令行有提供参数,命令行参数会覆盖加载出来的kubeletConfig配置的字段
				if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil {
					klog.Fatal(err)
				}
				// 基于新的配置更新feature gates
				if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil {
					klog.Fatal(err)
				}
			}

			//校验kubelet参数
			if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil {
				klog.Fatal(err)
			}

			// 如果开启动态配置,则有下面这段逻辑
			var kubeletConfigController *dynamickubeletconfig.Controller
			if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 {
				...
			}

			// 用kubeletFlags和kubeletConfig构造一个KubeletServer
			kubeletServer := &options.KubeletServer{
				KubeletFlags:         *kubeletFlags,
				KubeletConfiguration: *kubeletConfig,
			}

			// 使用kubeletServer构造默认的KubeletDeps
			kubeletDeps, err := UnsecuredDependencies(kubeletServer)
			if err != nil {
				klog.Fatal(err)
			}

			...
			// 运行kubelet
			if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil {
				klog.Fatal(err)
			}
		},
	}
	...

	return cmd
}

kubeletDeps 包含 kubelet 运行所必须的配置,是为了实现 dependency injection(注入依赖),其目的是为了把 kubelet 依赖的组件对象作为参数传进来,这样可以控制 kubelet 的行为。主要包括监控功能(cadvisor),cgroup 管理功能(containerManager)等。

NewKubeletCommand() 会调用 Run() 函数,Run() 中主要调用 run() 函数进行一些准备事项。

创建和apiserver通信的对象(cmd/kubelet/app/server.go)

run() 函数的主要功能:

  • 创建 kubeClient,evnetClient 用来和 apiserver 通信。创建 heartbeatClient 向 apiserver 上报心跳状态。
  • 为 kubeDeps 设定一些默认值。
  • 启动监听 Healthz 端口的 http server,默认端口是 10248。
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) {
	...
	// 注册当前配置到/configz api
	err = initConfigz(&s.KubeletConfiguration)
	if err != nil {
		klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err)
	}

	...

	// 判断kubelet的启动模式,如果处于独立模式,则通过将所有客户端设置为nil来表示
	switch {
	case standaloneMode:
		kubeDeps.KubeClient = nil
		kubeDeps.EventClient = nil
		kubeDeps.HeartbeatClient = nil
		klog.Warningf("standalone mode, no API client")

	case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil:
		...
		//创建kubeClient客户端
		kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig)
		
		...
		//创建eventClient 用于上报事件
		kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig)
		
		...
		//创建heartbeatClient 用于上报状态
		kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig)
		...
	}
	//为kubeDeps.Auth设定默认值,BuildAuth创建一个验证器,一个授权器,以及一个与kubelet需求相匹配的授权器属性getter
	if kubeDeps.Auth == nil {
		auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
		if err != nil {
			return err
		}
		kubeDeps.Auth = auth
	}

	...
	//为kubeDeps.CAdvisorInterface设定默认值
	if kubeDeps.CAdvisorInterface == nil {
		imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint)
		kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint))
		if err != nil {
			return err
		}
	}

	// 创建事件记录器
	makeEventRecorder(kubeDeps, nodeName)
	
	//为kubeDeps.ContainerManager设定默认值
	if kubeDeps.ContainerManager == nil {
		...
		kubeDeps.ContainerManager, err = cm.NewContainerManager(
			kubeDeps.Mounter,
			kubeDeps.CAdvisorInterface,
			cm.NodeConfig{
				RuntimeCgroupsName:    s.RuntimeCgroups,
				SystemCgroupsName:     s.SystemCgroups,
				KubeletCgroupsName:    s.KubeletCgroups,
				ContainerRuntime:      s.ContainerRuntime,
				CgroupsPerQOS:         s.CgroupsPerQOS,
				CgroupRoot:            s.CgroupRoot,
				CgroupDriver:          s.CgroupDriver,
				KubeletRootDir:        s.RootDirectory,
				ProtectKernelDefaults: s.ProtectKernelDefaults,
				NodeAllocatableConfig: cm.NodeAllocatableConfig{
					KubeReservedCgroupName:   s.KubeReservedCgroup,
					SystemReservedCgroupName: s.SystemReservedCgroup,
					EnforceNodeAllocatable:   sets.NewString(s.EnforceNodeAllocatable...),
					KubeReserved:             kubeReserved,
					SystemReserved:           systemReserved,
					HardEvictionThresholds:   hardEvictionThresholds,
				},
				QOSReserved:                           *experimentalQOSReserved,
				ExperimentalCPUManagerPolicy:          s.CPUManagerPolicy,
				ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration,
				ExperimentalPodPidsLimit:              s.PodPidsLimit,
				EnforceCPULimits:                      s.CPUCFSQuota,
				CPUCFSQuotaPeriod:                     s.CPUCFSQuotaPeriod.Duration,
				ExperimentalTopologyManagerPolicy:     s.TopologyManagerPolicy,
			},
			s.FailSwapOn,
			devicePluginEnabled,
			kubeDeps.Recorder)

		...
	}

	...
	//调用Runkubelet函数进行后续操作
	if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
		return err
	}

	// 如果开启了动态配置,StartSync函数启动了几个协程去监听以及更新配置
	if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 &&
		kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce {
		if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil {
			return err
		}
	}

	//启动监听healthz端口的http server
	if s.HealthzPort > 0 {
		mux := http.NewServeMux()
		healthz.InstallHandler(mux)
		go wait.Until(func() {
			err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux)
			if err != nil {
				klog.Errorf("Starting healthz server failed: %v", err)
			}
		}, 5*time.Second, wait.NeverStop)
	}

	...
}

kubelet 对 pod 资源的获取方式有三种:第一种是通过文件获得,文件一般放在 /etc/kubernetes/manifests 目录下面;第二种也是通过文件过得,只不过文件是通过 URL 获取的;第三种是通过 watch kube-apiserver 获取。其中前两种模式下,我们称 kubelet 运行在 standalone 模式下(如:kubeadm安装的kube-apiserv、kube-scheduler、kube-controller-manager等),运行在 standalone 模式下的 kubelet 一般用于调试某些功能。

run() 中调用 RunKubelet() 函数进行后续操作。

初始化kubelet组件内部的模块(cmd/kubelet/app/server.go)

RunKubelet() 主要功能:

  • 初始化 kubelet 组件中的各个模块,创建出 kubelet 对象。
  • 启动垃圾回收服务。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	...
	//初始化kubelet内部模块
	k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration,
		kubeDeps,
		&kubeServer.ContainerRuntimeOptions,
		kubeServer.ContainerRuntime,
		kubeServer.RuntimeCgroups,
		kubeServer.HostnameOverride,
		kubeServer.NodeIP,
		kubeServer.ProviderID,
		kubeServer.CloudProvider,
		kubeServer.CertDirectory,
		kubeServer.RootDirectory,
		kubeServer.RegisterNode,
		kubeServer.RegisterWithTaints,
		kubeServer.AllowedUnsafeSysctls,
		kubeServer.RemoteRuntimeEndpoint,
		kubeServer.RemoteImageEndpoint,
		kubeServer.ExperimentalMounterPath,
		kubeServer.ExperimentalKernelMemcgNotification,
		kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount,
		kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold,
		kubeServer.MinimumGCAge,
		kubeServer.MaxPerPodContainerCount,
		kubeServer.MaxContainerCount,
		kubeServer.MasterServiceNamespace,
		kubeServer.RegisterSchedulable,
		kubeServer.NonMasqueradeCIDR,
		kubeServer.KeepTerminatedPodVolumes,
		kubeServer.NodeLabels,
		kubeServer.SeccompProfileRoot,
		kubeServer.BootstrapCheckpointPath,
		kubeServer.NodeStatusMaxImages)
	if err != nil {
		return fmt.Errorf("failed to create kubelet: %v", err)
	}

	...
	// process pods and exit.
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %v", err)
		}
		klog.Info("Started kubelet as runonce")
	} else {
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer)
		klog.Info("Started kubelet")
	}
	return nil
}

createAndInitKubelet

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
func createAndInitKubelet(...) (k kubelet.Bootstrap, err error) {
	//NewMainKubelet 实例化一个 kubelet 对象,并对 kubelet 内部各个模块进行初始化
	k, err = kubelet.NewMainKubelet(kubeCfg,
		kubeDeps,
		crOptions,
		containerRuntime,
		runtimeCgroups,
		hostnameOverride,
		nodeIP,
		providerID,
		cloudProvider,
		certDirectory,
		rootDirectory,
		registerNode,
		registerWithTaints,
		allowedUnsafeSysctls,
		remoteRuntimeEndpoint,
		remoteImageEndpoint,
		experimentalMounterPath,
		experimentalKernelMemcgNotification,
		experimentalCheckNodeCapabilitiesBeforeMount,
		experimentalNodeAllocatableIgnoreEvictionThreshold,
		minimumGCAge,
		maxPerPodContainerCount,
		maxContainerCount,
		masterServiceNamespace,
		registerSchedulable,
		nonMasqueradeCIDR,
		keepTerminatedPodVolumes,
		nodeLabels,
		seccompProfileRoot,
		bootstrapCheckpointPath,
		nodeStatusMaxImages)
	if err != nil {
		return nil, err
	}
	// 通知apiserver kubelet启动了,通过事件发送
	k.BirthCry()
	// 启动垃圾回收服务,主要是containerGC以及ImageGC
	k.StartGarbageCollection()

	return k, nil
}

NewMainKubelet

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
func NewMainKubelet(...) (*Kubelet, error) {
	...
	
	if kubeDeps.PodConfig == nil {
		var err error
		//// 初始化 makePodSourceConfig,监听 pod 元数据的来源(FILE, URL, api-server),将不同 source 的 pod configuration 合并到一个结构中
		kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath)
		if err != nil {
			return nil, err
		}
	}
	//containerGc策略设置
	containerGCPolicy := kubecontainer.ContainerGCPolicy{
		MinAge:             minimumGCAge.Duration,
		MaxPerPodContainer: int(maxPerPodContainerCount),
		MaxContainers:      int(maxContainerCount),
	}
	// kubelet 服务端口,默认 10250
	daemonEndpoints := &v1.NodeDaemonEndpoints{
		KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port},
	}
	//imageGC策略设置
	imageGCPolicy := images.ImageGCPolicy{
		MinAge:               kubeCfg.ImageMinimumGCAge.Duration,
		HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent),
		LowThresholdPercent:  int(kubeCfg.ImageGCLowThresholdPercent),
	}
	
	//设置预留资源
	enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable
	if experimentalNodeAllocatableIgnoreEvictionThreshold {
		// Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions
		enforceNodeAllocatable = []string{}
	}
	
	//node资源不足时的驱逐策略设置
	thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim)
	if err != nil {
		return nil, err
	}
	evictionConfig := eviction.Config{
		PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration,
		MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod),
		Thresholds:               thresholds,
		KernelMemcgNotification:  experimentalKernelMemcgNotification,
		PodCgroupRoot:            kubeDeps.ContainerManager.GetPodCgroupRoot(),
	}
	
	// 使用 reflector 把 ListWatch 得到的服务信息实时同步到 serviceStore 对象中
	serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
	if kubeDeps.KubeClient != nil {
		serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything())
		r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0)
		go r.Run(wait.NeverStop)
	}
	serviceLister := corelisters.NewServiceLister(serviceIndexer)

	// 使用 reflector 把 ListWatch 得到的节点信息实时同步到  nodeStore 对象中
	nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
	if kubeDeps.KubeClient != nil {
		fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
		nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector)
		r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0)
		go r.Run(wait.NeverStop)
	}
	nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)}


	nodeRef := &v1.ObjectReference{
		Kind:      "Node",
		Name:      string(nodeName),
		UID:       types.UID(nodeName),
		Namespace: "",
	}

	// 容器引用的管理
	containerRefManager := kubecontainer.NewRefManager()
	// oom监控
	oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder)

	...
	//根据配置信息和各种对象创建kubelet实例
	klet := &Kubelet{
		hostname:                                hostname,
		hostnameOverridden:                      len(hostnameOverride) > 0,
		nodeName:                                nodeName,
		...
	}

	...
	//创建secretManager和configMapManager
	var secretManager secret.Manager
	var configMapManager configmap.Manager
	switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy {
	case kubeletconfiginternal.WatchChangeDetectionStrategy:
		secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient)
		configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient)
	case kubeletconfiginternal.TTLCacheChangeDetectionStrategy:
		secretManager = secret.NewCachingSecretManager(
			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
		configMapManager = configmap.NewCachingConfigMapManager(
			kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode))
	case kubeletconfiginternal.GetChangeDetectionStrategy:
		secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient)
		configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient)
	default:
		return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy)
	}

	klet.secretManager = secretManager
	klet.configMapManager = configMapManager

	if klet.experimentalHostUserNamespaceDefaulting {
		klog.Infof("Experimental host user namespace defaulting is enabled.")
	}
	// 从cAdvisor 中获取当前机器的信息
	machineInfo, err := klet.cadvisor.MachineInfo()
	if err != nil {
		return nil, err
	}
	klet.machineInfo = machineInfo

	imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)
	//创建livenessManager
	klet.livenessManager = proberesults.NewManager()
	//创建一个pod缓存
	klet.podCache = kubecontainer.NewCache()
	//创建一个checkpointManager
	var checkpointManager checkpointmanager.CheckpointManager
	if bootstrapCheckpointPath != "" {
		checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath)
		if err != nil {
			return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err)
		}
	}
	// 创建podManager,对pod的管理(如:增删改),还负责保持secretManager和configMapManager以及checkpointManager内容的更新。
	klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager)
	//statusManager 实时检测节点上 pod 的状态,并更新到 apiserver 对应的 pod
	klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)

	...
	//创建一个KubeGenericRuntimeManager(容器运行时管理)
	runtime, err := kuberuntime.NewKubeGenericRuntimeManager(...)
	...
	//pleg
	klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
	...
	// 创建containerGC对象,进行周期性的容器清理工作
	containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady)
	...

	// 创建imageManager管理镜像
	imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage)
	...

	//探针管理
	klet.probeManager = prober.NewManager(
		klet.statusManager,
		klet.livenessManager,
		klet.runner,
		containerRefManager,
		kubeDeps.Recorder)
	//token管理
	tokenManager := token.NewManager(kubeDeps.KubeClient)

	

	...
	//磁盘管理
	klet.volumeManager = volumemanager.NewVolumeManager(...)
	...
	// 容器驱逐策略管理
	evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock)

	klet.evictionManager = evictionManager
	...

	return klet, nil
}

RunKubelet 最后会调用 startKubelet() 进行后续的操作。

启动kubelet内部的模块及服务(cmd/kubelet/app/server.go)

startKubelet() 的主要功能:

  • 以 goroutine 方式启动 kubelet 中的各个模块。
  • 启动 kubelet http server。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) {
	// start the kubelet
	go wait.Until(func() {
		//以goroutine方式启动kubelet中的各个模块
		k.Run(podCfg.Updates())
	}, 0, wait.NeverStop)

	// 启动 kubelet http server	r
	if enableServer {
		go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling)

	}
	if kubeCfg.ReadOnlyPort > 0 {
		go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints)
	}
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) {
		go k.ListenAndServePodResources()
	}
}

Run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// Run starts the kubelet reacting to config updates,运行命令启动kubelet对配置更新作出反应
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	...
   // 如果有设置云提供商同步管理器则启动
   if kl.cloudResourceSyncManager != nil {
      go kl.cloudResourceSyncManager.Run(wait.NeverStop)
   }

	//初始化模块(提供了prometheus接口、设置文件系统目录、创建容器日志目录、启动imageManager、启动serverCertificateManager、启动oomWatcher、启动资源分析器),如果出现了错误就把错误事件上报
   if err := kl.initializeModules(); err != nil {
      kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
      klog.Fatal(err)
   }

   // 启动 volume manager
   go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

   if kl.kubeClient != nil {
      //这里表示每个nodeStatusUpdateFrequency秒就执行一次syncNodeStatus
      go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
      go kl.fastStatusUpdateOnce()

      // 启动 NodeLease上报,一种新的上报方式
      if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
         go kl.nodeLeaseController.Run(wait.NeverStop)
      }
   }
   //每5s检测一次containerRuntime状态
   go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

   // 开始同步iptables规则
   if kl.makeIPTablesUtilChains {
      go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
   }


   go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

   // 启动statusManager、probeManager
   kl.statusManager.Start()
   kl.probeManager.Start()

   // Start syncing RuntimeClasses if enabled.
   if kl.runtimeClassManager != nil {
      kl.runtimeClassManager.Start(wait.NeverStop)
   }

   // 启动pod声明周期管理
   kl.pleg.Start()
   kl.syncLoop(updates, kl)
}

大体上执行了以下几件事:

(1)启动kubelet的重要组件,如volumemanager、probemanager、runtimeclassmanager等。

(2)持续向API Server注册自身状态(通过go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) 这一段代码实现),向API Server通知节点没有丢失。

(3)持续监测容器运行时状态(通过go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) 这一行代码实现)。

(4)运行kubelet主循环,也是最关键的一步。正如注释中所说,syncLoop的作用就是通过API Server、file、http三条途径获取pod的理想状态,并不断地将pod的当前状态向理想状态同步。这个方法的核心,在于for循环中调用的syncLoopIteration方法。

到这里kubelet启动流程就结束了。

syncLoop 是 kubelet 的主循环方法,它从不同的管道(FILE,URL, API-SERVER)监听 pod 的变化,并把它们汇聚起来。当有新的变化发生时,它会调用对应的函数,保证 Pod 处于期望的状态。