# kubelet启动流程 ## kubelet启动流程 ![kubelet 启动流程时序图](https://xieys.club/images/posts/kubelet-3.png) > kubelet入口函数main(cmd/kubelet/kubelet.go) ``` func main() { rand.Seed(time.Now().UnixNano()) command := app.NewKubeletCommand() logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { os.Exit(1) } } ``` > 初始化kubelet配置(cmd/kubelet/app/server.go) ``` // NewKubeletCommand creates a *cobra.Command object with default parameters func NewKubeletCommand() *cobra.Command { cleanFlagSet := pflag.NewFlagSet(componentKubelet, pflag.ContinueOnError) cleanFlagSet.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) //kubelet配置分2部分: //kubeletFlag:指那些不允许在kubelet运行时进行修改的配置集,或者不能在集群中各个Nodes之间共享的配置集。 kubeletFlags := options.NewKubeletFlags() //KubeletConfiguration:指可以在集群中各个nodes之间共享的配置集,可以进行动态配置。 kubeletConfig, err := options.NewKubeletConfiguration() ... cmd := &cobra.Command{ ... Run: func(cmd *cobra.Command, args []string) { ... // 如果有提供配置文件的话,加载kubelet配置文件 if configFile := kubeletFlags.KubeletConfigFile; len(configFile) > 0 { kubeletConfig, err = loadConfigFile(configFile) if err != nil { klog.Fatal(err) } //如果命令行有提供参数,命令行参数会覆盖加载出来的kubeletConfig配置的字段 if err := kubeletConfigFlagPrecedence(kubeletConfig, args); err != nil { klog.Fatal(err) } // 基于新的配置更新feature gates if err := utilfeature.DefaultMutableFeatureGate.SetFromMap(kubeletConfig.FeatureGates); err != nil { klog.Fatal(err) } } //校验kubelet参数 if err := kubeletconfigvalidation.ValidateKubeletConfiguration(kubeletConfig); err != nil { klog.Fatal(err) } // 如果开启动态配置,则有下面这段逻辑 var kubeletConfigController *dynamickubeletconfig.Controller if dynamicConfigDir := kubeletFlags.DynamicConfigDir.Value(); len(dynamicConfigDir) > 0 { ... } // 用kubeletFlags和kubeletConfig构造一个KubeletServer kubeletServer := &options.KubeletServer{ KubeletFlags: *kubeletFlags, KubeletConfiguration: *kubeletConfig, } // 使用kubeletServer构造默认的KubeletDeps kubeletDeps, err := UnsecuredDependencies(kubeletServer) if err != nil { klog.Fatal(err) } ... // 运行kubelet if err := Run(kubeletServer, kubeletDeps, stopCh); err != nil { klog.Fatal(err) } }, } ... return cmd } ``` kubeletDeps 包含 kubelet 运行所必须的配置,是为了实现 dependency injection(注入依赖),其目的是为了把 kubelet 依赖的组件对象作为参数传进来,这样可以控制 kubelet 的行为。主要包括监控功能(cadvisor),cgroup 管理功能(containerManager)等。 NewKubeletCommand() 会调用 Run() 函数,Run() 中主要调用 run() 函数进行一些准备事项。 > 创建和apiserver通信的对象(cmd/kubelet/app/server.go) run() 函数的主要功能: - 创建 kubeClient,evnetClient 用来和 apiserver 通信。创建 heartbeatClient 向 apiserver 上报心跳状态。 - 为 kubeDeps 设定一些默认值。 - 启动监听 Healthz 端口的 http server,默认端口是 10248。 ``` func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan struct{}) (err error) { ... // 注册当前配置到/configz api err = initConfigz(&s.KubeletConfiguration) if err != nil { klog.Errorf("unable to register KubeletConfiguration with configz, error: %v", err) } ... // 判断kubelet的启动模式,如果处于独立模式,则通过将所有客户端设置为nil来表示 switch { case standaloneMode: kubeDeps.KubeClient = nil kubeDeps.EventClient = nil kubeDeps.HeartbeatClient = nil klog.Warningf("standalone mode, no API client") case kubeDeps.KubeClient == nil, kubeDeps.EventClient == nil, kubeDeps.HeartbeatClient == nil: ... //创建kubeClient客户端 kubeDeps.KubeClient, err = clientset.NewForConfig(clientConfig) ... //创建eventClient 用于上报事件 kubeDeps.EventClient, err = v1core.NewForConfig(&eventClientConfig) ... //创建heartbeatClient 用于上报状态 kubeDeps.HeartbeatClient, err = clientset.NewForConfig(&heartbeatClientConfig) ... } //为kubeDeps.Auth设定默认值,BuildAuth创建一个验证器,一个授权器,以及一个与kubelet需求相匹配的授权器属性getter if kubeDeps.Auth == nil { auth, err := BuildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration) if err != nil { return err } kubeDeps.Auth = auth } ... //为kubeDeps.CAdvisorInterface设定默认值 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadvisor.NewImageFsInfoProvider(s.ContainerRuntime, s.RemoteRuntimeEndpoint) kubeDeps.CAdvisorInterface, err = cadvisor.New(imageFsInfoProvider, s.RootDirectory, cgroupRoots, cadvisor.UsingLegacyCadvisorStats(s.ContainerRuntime, s.RemoteRuntimeEndpoint)) if err != nil { return err } } // 创建事件记录器 makeEventRecorder(kubeDeps, nodeName) //为kubeDeps.ContainerManager设定默认值 if kubeDeps.ContainerManager == nil { ... kubeDeps.ContainerManager, err = cm.NewContainerManager( kubeDeps.Mounter, kubeDeps.CAdvisorInterface, cm.NodeConfig{ RuntimeCgroupsName: s.RuntimeCgroups, SystemCgroupsName: s.SystemCgroups, KubeletCgroupsName: s.KubeletCgroups, ContainerRuntime: s.ContainerRuntime, CgroupsPerQOS: s.CgroupsPerQOS, CgroupRoot: s.CgroupRoot, CgroupDriver: s.CgroupDriver, KubeletRootDir: s.RootDirectory, ProtectKernelDefaults: s.ProtectKernelDefaults, NodeAllocatableConfig: cm.NodeAllocatableConfig{ KubeReservedCgroupName: s.KubeReservedCgroup, SystemReservedCgroupName: s.SystemReservedCgroup, EnforceNodeAllocatable: sets.NewString(s.EnforceNodeAllocatable...), KubeReserved: kubeReserved, SystemReserved: systemReserved, HardEvictionThresholds: hardEvictionThresholds, }, QOSReserved: *experimentalQOSReserved, ExperimentalCPUManagerPolicy: s.CPUManagerPolicy, ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration, ExperimentalPodPidsLimit: s.PodPidsLimit, EnforceCPULimits: s.CPUCFSQuota, CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration, ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy, }, s.FailSwapOn, devicePluginEnabled, kubeDeps.Recorder) ... } ... //调用Runkubelet函数进行后续操作 if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil { return err } // 如果开启了动态配置,StartSync函数启动了几个协程去监听以及更新配置 if utilfeature.DefaultFeatureGate.Enabled(features.DynamicKubeletConfig) && len(s.DynamicConfigDir.Value()) > 0 && kubeDeps.KubeletConfigController != nil && !standaloneMode && !s.RunOnce { if err := kubeDeps.KubeletConfigController.StartSync(kubeDeps.KubeClient, kubeDeps.EventClient, string(nodeName)); err != nil { return err } } //启动监听healthz端口的http server if s.HealthzPort > 0 { mux := http.NewServeMux() healthz.InstallHandler(mux) go wait.Until(func() { err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), mux) if err != nil { klog.Errorf("Starting healthz server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } ... } ``` kubelet 对 pod 资源的获取方式有三种:第一种是通过文件获得,文件一般放在 /etc/kubernetes/manifests 目录下面;第二种也是通过文件过得,只不过文件是通过 URL 获取的;第三种是通过 watch kube-apiserver 获取。其中前两种模式下,我们称 kubelet 运行在 standalone 模式下(如:kubeadm安装的kube-apiserv、kube-scheduler、kube-controller-manager等),运行在 standalone 模式下的 kubelet 一般用于调试某些功能。 run() 中调用 RunKubelet() 函数进行后续操作。 > 初始化kubelet组件内部的模块(cmd/kubelet/app/server.go) RunKubelet() 主要功能: - 初始化 kubelet 组件中的各个模块,创建出 kubelet 对象。 - 启动垃圾回收服务。 ``` func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error { ... //初始化kubelet内部模块 k, err := createAndInitKubelet(&kubeServer.KubeletConfiguration, kubeDeps, &kubeServer.ContainerRuntimeOptions, kubeServer.ContainerRuntime, kubeServer.RuntimeCgroups, kubeServer.HostnameOverride, kubeServer.NodeIP, kubeServer.ProviderID, kubeServer.CloudProvider, kubeServer.CertDirectory, kubeServer.RootDirectory, kubeServer.RegisterNode, kubeServer.RegisterWithTaints, kubeServer.AllowedUnsafeSysctls, kubeServer.RemoteRuntimeEndpoint, kubeServer.RemoteImageEndpoint, kubeServer.ExperimentalMounterPath, kubeServer.ExperimentalKernelMemcgNotification, kubeServer.ExperimentalCheckNodeCapabilitiesBeforeMount, kubeServer.ExperimentalNodeAllocatableIgnoreEvictionThreshold, kubeServer.MinimumGCAge, kubeServer.MaxPerPodContainerCount, kubeServer.MaxContainerCount, kubeServer.MasterServiceNamespace, kubeServer.RegisterSchedulable, kubeServer.NonMasqueradeCIDR, kubeServer.KeepTerminatedPodVolumes, kubeServer.NodeLabels, kubeServer.SeccompProfileRoot, kubeServer.BootstrapCheckpointPath, kubeServer.NodeStatusMaxImages) if err != nil { return fmt.Errorf("failed to create kubelet: %v", err) } ... // process pods and exit. if runOnce { if _, err := k.RunOnce(podCfg.Updates()); err != nil { return fmt.Errorf("runonce failed: %v", err) } klog.Info("Started kubelet as runonce") } else { startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableCAdvisorJSONEndpoints, kubeServer.EnableServer) klog.Info("Started kubelet") } return nil } ``` createAndInitKubelet ``` func createAndInitKubelet(...) (k kubelet.Bootstrap, err error) { //NewMainKubelet 实例化一个 kubelet 对象,并对 kubelet 内部各个模块进行初始化 k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, crOptions, containerRuntime, runtimeCgroups, hostnameOverride, nodeIP, providerID, cloudProvider, certDirectory, rootDirectory, registerNode, registerWithTaints, allowedUnsafeSysctls, remoteRuntimeEndpoint, remoteImageEndpoint, experimentalMounterPath, experimentalKernelMemcgNotification, experimentalCheckNodeCapabilitiesBeforeMount, experimentalNodeAllocatableIgnoreEvictionThreshold, minimumGCAge, maxPerPodContainerCount, maxContainerCount, masterServiceNamespace, registerSchedulable, nonMasqueradeCIDR, keepTerminatedPodVolumes, nodeLabels, seccompProfileRoot, bootstrapCheckpointPath, nodeStatusMaxImages) if err != nil { return nil, err } // 通知apiserver kubelet启动了,通过事件发送 k.BirthCry() // 启动垃圾回收服务,主要是containerGC以及ImageGC k.StartGarbageCollection() return k, nil } ``` NewMainKubelet ``` func NewMainKubelet(...) (*Kubelet, error) { ... if kubeDeps.PodConfig == nil { var err error //// 初始化 makePodSourceConfig,监听 pod 元数据的来源(FILE, URL, api-server),将不同 source 的 pod configuration 合并到一个结构中 kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, bootstrapCheckpointPath) if err != nil { return nil, err } } //containerGc策略设置 containerGCPolicy := kubecontainer.ContainerGCPolicy{ MinAge: minimumGCAge.Duration, MaxPerPodContainer: int(maxPerPodContainerCount), MaxContainers: int(maxContainerCount), } // kubelet 服务端口,默认 10250 daemonEndpoints := &v1.NodeDaemonEndpoints{ KubeletEndpoint: v1.DaemonEndpoint{Port: kubeCfg.Port}, } //imageGC策略设置 imageGCPolicy := images.ImageGCPolicy{ MinAge: kubeCfg.ImageMinimumGCAge.Duration, HighThresholdPercent: int(kubeCfg.ImageGCHighThresholdPercent), LowThresholdPercent: int(kubeCfg.ImageGCLowThresholdPercent), } //设置预留资源 enforceNodeAllocatable := kubeCfg.EnforceNodeAllocatable if experimentalNodeAllocatableIgnoreEvictionThreshold { // Do not provide kubeCfg.EnforceNodeAllocatable to eviction threshold parsing if we are not enforcing Evictions enforceNodeAllocatable = []string{} } //node资源不足时的驱逐策略设置 thresholds, err := eviction.ParseThresholdConfig(enforceNodeAllocatable, kubeCfg.EvictionHard, kubeCfg.EvictionSoft, kubeCfg.EvictionSoftGracePeriod, kubeCfg.EvictionMinimumReclaim) if err != nil { return nil, err } evictionConfig := eviction.Config{ PressureTransitionPeriod: kubeCfg.EvictionPressureTransitionPeriod.Duration, MaxPodGracePeriodSeconds: int64(kubeCfg.EvictionMaxPodGracePeriod), Thresholds: thresholds, KernelMemcgNotification: experimentalKernelMemcgNotification, PodCgroupRoot: kubeDeps.ContainerManager.GetPodCgroupRoot(), } // 使用 reflector 把 ListWatch 得到的服务信息实时同步到 serviceStore 对象中 serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if kubeDeps.KubeClient != nil { serviceLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "services", metav1.NamespaceAll, fields.Everything()) r := cache.NewReflector(serviceLW, &v1.Service{}, serviceIndexer, 0) go r.Run(wait.NeverStop) } serviceLister := corelisters.NewServiceLister(serviceIndexer) // 使用 reflector 把 ListWatch 得到的节点信息实时同步到 nodeStore 对象中 nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) if kubeDeps.KubeClient != nil { fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector() nodeLW := cache.NewListWatchFromClient(kubeDeps.KubeClient.CoreV1().RESTClient(), "nodes", metav1.NamespaceAll, fieldSelector) r := cache.NewReflector(nodeLW, &v1.Node{}, nodeIndexer, 0) go r.Run(wait.NeverStop) } nodeInfo := &CachedNodeInfo{NodeLister: corelisters.NewNodeLister(nodeIndexer)} nodeRef := &v1.ObjectReference{ Kind: "Node", Name: string(nodeName), UID: types.UID(nodeName), Namespace: "", } // 容器引用的管理 containerRefManager := kubecontainer.NewRefManager() // oom监控 oomWatcher := oomwatcher.NewWatcher(kubeDeps.Recorder) ... //根据配置信息和各种对象创建kubelet实例 klet := &Kubelet{ hostname: hostname, hostnameOverridden: len(hostnameOverride) > 0, nodeName: nodeName, ... } ... //创建secretManager和configMapManager var secretManager secret.Manager var configMapManager configmap.Manager switch kubeCfg.ConfigMapAndSecretChangeDetectionStrategy { case kubeletconfiginternal.WatchChangeDetectionStrategy: secretManager = secret.NewWatchingSecretManager(kubeDeps.KubeClient) configMapManager = configmap.NewWatchingConfigMapManager(kubeDeps.KubeClient) case kubeletconfiginternal.TTLCacheChangeDetectionStrategy: secretManager = secret.NewCachingSecretManager( kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) configMapManager = configmap.NewCachingConfigMapManager( kubeDeps.KubeClient, manager.GetObjectTTLFromNodeFunc(klet.GetNode)) case kubeletconfiginternal.GetChangeDetectionStrategy: secretManager = secret.NewSimpleSecretManager(kubeDeps.KubeClient) configMapManager = configmap.NewSimpleConfigMapManager(kubeDeps.KubeClient) default: return nil, fmt.Errorf("unknown configmap and secret manager mode: %v", kubeCfg.ConfigMapAndSecretChangeDetectionStrategy) } klet.secretManager = secretManager klet.configMapManager = configMapManager if klet.experimentalHostUserNamespaceDefaulting { klog.Infof("Experimental host user namespace defaulting is enabled.") } // 从cAdvisor 中获取当前机器的信息 machineInfo, err := klet.cadvisor.MachineInfo() if err != nil { return nil, err } klet.machineInfo = machineInfo imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff) //创建livenessManager klet.livenessManager = proberesults.NewManager() //创建一个pod缓存 klet.podCache = kubecontainer.NewCache() //创建一个checkpointManager var checkpointManager checkpointmanager.CheckpointManager if bootstrapCheckpointPath != "" { checkpointManager, err = checkpointmanager.NewCheckpointManager(bootstrapCheckpointPath) if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager: %+v", err) } } // 创建podManager,对pod的管理(如:增删改),还负责保持secretManager和configMapManager以及checkpointManager内容的更新。 klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager, checkpointManager) //statusManager 实时检测节点上 pod 的状态,并更新到 apiserver 对应的 pod klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet) ... //创建一个KubeGenericRuntimeManager(容器运行时管理) runtime, err := kuberuntime.NewKubeGenericRuntimeManager(...) ... //pleg klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) ... // 创建containerGC对象,进行周期性的容器清理工作 containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy, klet.sourcesReady) ... // 创建imageManager管理镜像 imageManager, err := images.NewImageGCManager(klet.containerRuntime, klet.StatsProvider, kubeDeps.Recorder, nodeRef, imageGCPolicy, crOptions.PodSandboxImage) ... //探针管理 klet.probeManager = prober.NewManager( klet.statusManager, klet.livenessManager, klet.runner, containerRefManager, kubeDeps.Recorder) //token管理 tokenManager := token.NewManager(kubeDeps.KubeClient) ... //磁盘管理 klet.volumeManager = volumemanager.NewVolumeManager(...) ... // 容器驱逐策略管理 evictionManager, evictionAdmitHandler := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers, kubeDeps.Recorder), klet.podManager.GetMirrorPodByPod, klet.imageManager, klet.containerGC, kubeDeps.Recorder, nodeRef, klet.clock) klet.evictionManager = evictionManager ... return klet, nil } ``` RunKubelet 最后会调用 startKubelet() 进行后续的操作。 > 启动kubelet内部的模块及服务(cmd/kubelet/app/server.go) startKubelet() 的主要功能: - 以 goroutine 方式启动 kubelet 中的各个模块。 - 启动 kubelet http server。 ``` func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableCAdvisorJSONEndpoints, enableServer bool) { // start the kubelet go wait.Until(func() { //以goroutine方式启动kubelet中的各个模块 k.Run(podCfg.Updates()) }, 0, wait.NeverStop) // 启动 kubelet http server r if enableServer { go k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, enableCAdvisorJSONEndpoints, kubeCfg.EnableDebuggingHandlers, kubeCfg.EnableContentionProfiling) } if kubeCfg.ReadOnlyPort > 0 { go k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort), enableCAdvisorJSONEndpoints) } if utilfeature.DefaultFeatureGate.Enabled(features.KubeletPodResources) { go k.ListenAndServePodResources() } } ``` Run ``` // Run starts the kubelet reacting to config updates,运行命令启动kubelet对配置更新作出反应 func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { ... // 如果有设置云提供商同步管理器则启动 if kl.cloudResourceSyncManager != nil { go kl.cloudResourceSyncManager.Run(wait.NeverStop) } //初始化模块(提供了prometheus接口、设置文件系统目录、创建容器日志目录、启动imageManager、启动serverCertificateManager、启动oomWatcher、启动资源分析器),如果出现了错误就把错误事件上报 if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) klog.Fatal(err) } // 启动 volume manager go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { //这里表示每个nodeStatusUpdateFrequency秒就执行一次syncNodeStatus go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) go kl.fastStatusUpdateOnce() // 启动 NodeLease上报,一种新的上报方式 if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { go kl.nodeLeaseController.Run(wait.NeverStop) } } //每5s检测一次containerRuntime状态 go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // 开始同步iptables规则 if kl.makeIPTablesUtilChains { go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop) } go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop) // 启动statusManager、probeManager kl.statusManager.Start() kl.probeManager.Start() // Start syncing RuntimeClasses if enabled. if kl.runtimeClassManager != nil { kl.runtimeClassManager.Start(wait.NeverStop) } // 启动pod声明周期管理 kl.pleg.Start() kl.syncLoop(updates, kl) } ``` 大体上执行了以下几件事: (1)启动kubelet的重要组件,如volumemanager、probemanager、runtimeclassmanager等。 (2)持续向API Server注册自身状态(通过go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) 这一段代码实现),向API Server通知节点没有丢失。 (3)持续监测容器运行时状态(通过go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) 这一行代码实现)。 (4)运行kubelet主循环,也是最关键的一步。正如注释中所说,syncLoop的作用就是通过API Server、file、http三条途径获取pod的理想状态,并不断地将pod的当前状态向理想状态同步。这个方法的核心,在于for循环中调用的syncLoopIteration方法。 到这里kubelet启动流程就结束了。 syncLoop 是 kubelet 的主循环方法,它从不同的管道(FILE,URL, API-SERVER)监听 pod 的变化,并把它们汇聚起来。当有新的变化发生时,它会调用对应的函数,保证 Pod 处于期望的状态。