# kubelet创建pod流程 ## kubelet工作原理 ![kubelet 工作原理](https://xieys.club/images/posts/kubelet-1.png) kubelet 的工作核心就是在围绕着不同的生产者生产出来的不同的有关 pod 的消息来调用相应的消费者(不同的子模块)完成不同的行为(创建和删除 pod 等),即图中的控制循环(SyncLoop),通过不同的事件驱动这个控制循环运行。 本文仅分析新建 pod 的流程,当一个 pod 完成调度,与一个 node 绑定起来之后,这个 pod 就会触发 kubelet 在循环控制里注册的 handler,上图中的 HandlePods 部分。此时,通过检查 pod 在 kubelet 内存中的状态,kubelet 就能判断出这是一个新调度过来的 pod,从而触发 Handler 里的 ADD 事件对应的逻辑处理。然后 kubelet 会为这个 pod 生成对应的 podStatus,接着检查 pod 所声明的 volume 是不是准备好了,然后调用下层的容器运行时。如果是 update 事件的话,kubelet 就会根据 pod 对象具体的变更情况,调用下层的容器运行时进行容器的重建。 ## kubelet创建pod的流程 ![kubelet 创建 pod 的流程](https://xieys.club/images/posts/kubelet-2.png) ### kubelet的控制循环(syncLoop) ``` func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { klog.Info("Starting kubelet main sync loop.") //syncTicker 每秒检测一次是否有需要同步的 pod workers syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() // 每两秒检测一次是否需要清理的pod housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() // pod的生命周期变化 plegCh := kl.pleg.Watch() const ( base = 100 * time.Millisecond max = 5 * time.Second factor = 2 ) duration := base for { if err := kl.runtimeState.runtimeErrors(); err != nil { klog.Infof("skipping pod synchronization - %v", err) // exponential backoff time.Sleep(duration) duration = time.Duration(math.Min(float64(max), factor*float64(duration))) continue } // reset backoff if we have a success duration = base kl.syncLoopMonitor.Store(kl.clock.Now()) //第二个参数为SyncHandler类型,SyncHandler是一个interface,在该文件开头处定义,这里由Run函数将kubelet实例传递给SyncHandler if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } kl.syncLoopMonitor.Store(kl.clock.Now()) } } ``` ### 监听pod变化(syncLoopIteration) syncLoopIteration 这个方法就会对多个管道进行遍历,发现任何一个管道有消息就交给 handler 去处理。它会从以下管道中获取消息: - configCh:该信息源由 kubeDeps 对象中的 PodConfig 子模块提供,该模块将同时 watch 3 个不同来源的 pod 信息的变化(file,http,apiserver),一旦某个来源的 pod 信息发生了更新(创建/更新/删除),这个 channel 中就会出现被更新的 pod 信息和更新的具体操作。 - syncCh:定时器管道,每隔一秒去同步最新保存的 pod 状态 - houseKeepingCh:housekeeping 事件的管道,做 pod 清理工作 - plegCh:该信息源由 kubelet 对象中的 pleg 子模块提供,该模块主要用于周期性地向 container runtime 查询当前所有容器的状态,如果状态发生变化(因为某些情况被杀死,被暂停等),则这个 channel 产生事件。 - livenessManager.Updates():健康检查发现某个 pod 不可用,kubelet 将根据 Pod 的restartPolicy 自动执行正确的操作 ``` func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { case u, open := <-configCh: if !open { klog.Errorf("Update channel is closed. Exiting the sync loop.") return false } switch u.Op { case kubetypes.ADD: klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodRemoves(u.Pods) case kubetypes.RECONCILE: klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodReconcile(u.Pods) case kubetypes.DELETE: klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) // DELETE is treated as a UPDATE because of graceful deletion. handler.HandlePodUpdates(u.Pods) case kubetypes.RESTORE: klog.V(2).Infof("SyncLoop (RESTORE, %q): %q", u.Source, format.Pods(u.Pods)) // These are pods restored from the checkpoint. Treat them as new // pods. handler.HandlePodAdditions(u.Pods) case kubetypes.SET: // TODO: Do we want to support this? klog.Errorf("Kubelet does not support snapshot update") } if u.Op != kubetypes.RESTORE { kl.sourcesReady.AddSource(u.Source) } case e := <-plegCh: if isSyncPodWorthy(e) { // PLEG event for a pod; sync it. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) handler.HandlePodSyncs([]*v1.Pod{pod}) } else { // If the pod no longer exists, ignore the event. klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) } } if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } } case <-syncCh: // Sync pods waiting for sync podsToSync := kl.getPodsToSync() if len(podsToSync) == 0 { break } klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync)) handler.HandlePodSyncs(podsToSync) case update := <-kl.livenessManager.Updates(): if update.Result == proberesults.Failure { pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { // If the pod no longer exists, ignore the update. klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update) break } klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod)) handler.HandlePodSyncs([]*v1.Pod{pod}) } case <-housekeepingCh: if !kl.sourcesReady.AllReady() { klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.") } else { klog.V(4).Infof("SyncLoop (housekeeping)") if err := handler.HandlePodCleanups(); err != nil { klog.Errorf("Failed cleaning pods: %v", err) } } } return true } ``` ### 处理新增pod(HandlePodAddtions) 对于事件中的每个 pod,执行以下操作: - 把所有的 pod 按照创建日期进行排序,保证最先创建的 pod 会最先被处理 - 把它加入到 podManager 中,podManager 子模块负责管理这台机器上的 pod 的信息,pod 和 mirrorPod 之间的对应关系等等。所有被管理的 pod 都要出现在里面,如果 podManager 中找不到某个 pod,就认为这个 pod 被删除了 ``` mirror pod的意思是,对于apiserver之外的途径(即file或http)途径生成的pod(称为静态pod),apiserver不能感知到它的存在。因此kubelet会创建一个name和namespace与这个静态pod一样的pod,以便apiserver可以感知到这个pod的存在。这个由kubelet自动创建的pod就被称为mirror pod。静态pod的使用较少,可以不用太关注。 ``` - 如果是 mirror pod 调用其单独的方法 - 验证 pod 是否能在该节点运行,如果不可以直接拒绝 - 通过 dispatchWork 把创建 pod 的工作下发给 podWorkers 子模块做异步处理 - 在 probeManager 中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测 ``` func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) { start := kl.clock.Now() // 对所有 pod 按照日期排序,保证最先创建的 pod 优先被处理 sort.Sort(sliceutils.PodsByCreationTime(pods)) // Responsible for checking limits in resolv.conf // The limits do not have anything to do with individual pods if kl.dnsConfigurer != nil && kl.dnsConfigurer.ResolverConfig != "" { kl.dnsConfigurer.CheckLimitsForResolvConf() } for _, pod := range pods { //获取podManager里已存在的pod existingPods := kl.podManager.GetPods() // 把 pod 加入到 podManager 中 kl.podManager.AddPod(pod) // 判断是否是 mirror pod(即 static pod) if kubepod.IsMirrorPod(pod) { kl.handleMirrorPod(pod, start) continue } //当前pod状态不是Terminate(不是“Failed”或“Succeeded”状态) if !kl.podIsTerminated(pod) { // 通过过滤podManager已存在的pod中的不是“Failed”或“Succeeded”状态的pod,也就是activePods activePods := kl.filterOutTerminatedPods(existingPods) // 通过 canAdmitPod 方法校验Pod能否在该计算节点创建(如:磁盘空间、内存等) if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok { kl.rejectPod(pod, reason, message) continue } } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) // 通过 dispatchWork 分发 pod 做异步处理,dispatchWork 主要工作就是把接收到的参数封装成 UpdatePodOptions,调用 UpdatePod 方法. kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) // 在 probeManager 中添加 pod,如果 pod 中定义了 readiness 和 liveness 健康检查,启动 goroutine 定期进行检测 kl.probeManager.AddPod(pod) } } ``` **static pod 是由 kubelet 直接管理的,k8s apiserver 并不会感知到 static pod 的存在,当然也不会和任何一个 rs 关联上,完全是由 kubelet 进程来监管,并在它异常时负责重启。Kubelet 会通过 apiserver 为每一个 static pod 创建一个对应的 mirror pod,如此以来就可以可以通过 kubectl 命令查看对应的 pod,并且可以通过 kubectl logs 命令直接查看到static pod 的日志信息。** ### 下发任务(dispatchWork) dispatchWorker 的主要作用是把某个对 Pod 的操作(创建/更新/删除)下发给 podWorkers。 ``` func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { if kl.podIsTerminated(pod) { if pod.DeletionTimestamp != nil { // If the pod is in a terminated state, there is no pod worker to // handle the work item. Check if the DeletionTimestamp has been // set, and force a status update to trigger a pod deletion request // to the apiserver. kl.statusManager.TerminatePod(pod) } return } // 落实到podWorks中 kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, OnCompleteFunc: func(err error) { if err != nil { metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start)) metrics.DeprecatedPodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) } }, }) // Note the number of containers for new pods. if syncType == kubetypes.SyncPodCreate { metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers))) } } ``` ### 更新时间 channel(UpdatePod) podWorkers 子模块主要的作用就是处理针对每一个的 Pod 的更新事件,比如 Pod 的创建,删除,更新。而 podWorkers 采取的基本思路是:为每一个 Pod 都单独创建一个 goroutine 和更新事件的 channel,goroutine 会阻塞式的等待 channel 中的事件,并且对获取的事件进行处理。而 podWorkers 对象自身则主要负责对更新事件进行下发。 ``` func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { pod := options.Pod uid := pod.UID var podUpdates chan UpdatePodOptions var exists bool p.podLock.Lock() defer p.podLock.Unlock() //如果当前pod还没有启动过goroutine,则启动goroutine,并创建无缓冲channel(会阻塞) if podUpdates, exists = p.podUpdates[uid]; !exists { //创建channel podUpdates = make(chan UpdatePodOptions, 1) p.podUpdates[uid] = podUpdates // 启动goroutine go func() { defer runtime.HandleCrash() p.managePodLoop(podUpdates) }() } //下发更新事件 if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true podUpdates <- *options } else { // 如果一个终止pod的请求正在等待,我们不让任何东西覆盖该请求。 update, found := p.lastUndeliveredWorkUpdate[pod.UID] if !found || update.UpdateType != kubetypes.SyncPodKill { p.lastUndeliveredWorkUpdate[pod.UID] = *options } } } ``` ### 调用syncPodFn方法同步pod(managePodLoop) managePodLoop 调用 syncPodFn 方法去同步 pod,syncPodFn 实际上就是kubelet.SyncPod,podWorkers在初始化的时候传递的参数就是kubelet.SyncPod。在完成这次 sync 动作之后,会调用 wrapUp 函数,这个函数将会做几件事情: ```. klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache) ... func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan UpdatePodOptions{}, isWorking: map[types.UID]bool{}, lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{}, syncPodFn: syncPodFn, recorder: recorder, workQueue: workQueue, resyncInterval: resyncInterval, backOffPeriod: backOffPeriod, podCache: podCache, } } ``` - 将这个 pod 信息插入 kubelet 的 workQueue 队列中,等待下一次周期性的对这个 pod 的状态进行 sync - 根据pod uid,将在这次 sync 期间堆积的没有能够来得及处理的最近一次 update 操作加入 goroutine 的事件 channel 中,立即处理。 ``` func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) { var lastSyncTime time.Time for update := range podUpdates { err := func() error { podUID := update.Pod.UID status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) if err != nil { p.recorder.Eventf(update.Pod, v1.EventTypeWarning, events.FailedSync, "error determining status: %v", err) return err } err = p.syncPodFn(syncPodOptions{ mirrorPod: update.MirrorPod, pod: update.Pod, podStatus: status, killPodOptions: update.KillPodOptions, updateType: update.UpdateType, }) lastSyncTime = time.Now() return err }() if update.OnCompleteFunc != nil { update.OnCompleteFunc(err) } if err != nil { klog.Errorf("Error syncing pod %s (%q), skipping: %v", update.Pod.UID, format.Pod(update.Pod), err) } p.wrapUp(update.Pod.UID, err) } } ``` ### 完成创建容器前的准备工作(SyncPod) 在这个方法中,主要完成以下几件事情: - 如果是删除 pod,立即执行并返回 - 同步 podStatus 到 kubelet.statusManager - 检查 pod 是否能运行在本节点,主要是权限检查(是否能使用主机网络模式,是否可以以 privileged 权限运行等)。如果没有权限,就删除本地旧的 pod 并返回错误信息 - 创建 containerManagar 对象,并且创建 pod level cgroup,更新 Qos level cgroup - 如果是 static Pod,就创建或者更新对应的 mirrorPod - 创建 pod 的数据目录,存放 volume 和 plugin 信息,如果定义了 pv,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情),如果有 image secrets,去 apiserver 获取对应的 secrets 数据 - 然后调用 kubelet.volumeManager 组件,等待它将 pod 所需要的所有外挂的 volume 都准备好。 - 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑 这里所有的事情都和具体的容器没有关系,可以看到该方法是创建 pod 实体(即容器)之前需要完成的准备工作。 ``` func (kl *Kubelet) syncPod(o syncPodOptions) error { // pull out the required options pod := o.pod mirrorPod := o.mirrorPod podStatus := o.podStatus updateType := o.updateType // 是非为删除pod if updateType == kubetypes.SyncPodKill { ... } ... //检查pod是否能运行在本节点 runnable := kl.canRunPod(pod) if !runnable.Admit { ... } // 更新pod状态 kl.statusManager.SetPodStatus(pod, apiPodStatus) // 如果 pod 非 running 状态则直接 kill 掉 if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed { ... } // 加载网络插件 if err := kl.runtimeState.networkErrors(); err != nil && !kubecontainer.IsHostNetworkPod(pod) { ... } pcm := kl.containerManager.NewPodContainerManager() if !kl.podIsTerminated(pod) { ... //创建并更新pod的cgroups if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) { if !pcm.Exists(pod) { ... } } } // 为static pod 创建对应的mirror pod if kubetypes.IsStaticPod(pod) { ... } // 为pod创建数据目录 if err := kl.makePodDataDirs(pod); err != nil { ... } // 挂载volume if !kl.podIsTerminated(pod) { // Wait for volumes to attach/mount if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil { ... } } // 获取secret信息 pullSecrets := kl.getPullSecretsForPod(pod) // 调用 containerRuntime 的 SyncPod 方法开始创建容器 result := kl.containerRuntime.SyncPod(pod, podStatus, pullSecrets, kl.backOff) kl.reasonCache.Update(pod.UID, result) if err := result.Error(); err != nil { ... } return nil } ``` ### 创建容器 containerRuntime(pkg/kubelet/kuberuntime)子模块的 SyncPod 函数才是真正完成 pod 内容器实体的创建。 syncPod 主要执行以下几个操作: - 计算 sandbox 和 container 是否发生变化 - kill 掉 sandbox 已经改变的 pod - kill 掉非 running 状态的 containers - 如果有必要,创建 sandbox 容器 - 创建临时容器 - 启动 init 容器 - 启动业务容器 **initContainers 可以有多个,多个 container 严格按照顺序启动,只有当前一个 container 退出了以后,才开始启动下一个 container。** ``` func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) { // Step 1: Compute sandbox and container changes.计算 sandbox 和 container 是否发生变化 podContainerChanges := m.computePodActions(pod, podStatus) klog.V(3).Infof("computePodActions got %+v for pod %q", podContainerChanges, format.Pod(pod)) if podContainerChanges.CreateSandbox { ref, err := ref.GetReference(legacyscheme.Scheme, pod) if err != nil { klog.Errorf("Couldn't make a ref to pod %q: '%v'", format.Pod(pod), err) } ... } // Step 2: Kill the pod if the sandbox has changed.kill 掉 sandbox 已经改变的 pod if podContainerChanges.KillPod { ... } else { // Step 3: kill any running containers in this pod which are not to keep.kill 掉非 running 状态的 containers for containerID, containerInfo := range podContainerChanges.ContainersToKill { ... if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil { ... } } } m.pruneInitContainersBeforeStart(pod, podStatus) var podIPs []string if podStatus != nil { podIPs = podStatus.IPs } // Step 4: Create a sandbox for the pod if necessary.创建 sandbox podSandboxID := podContainerChanges.SandboxID if podContainerChanges.CreateSandbox { ... createSandboxResult := kubecontainer.NewSyncResult(kubecontainer.CreatePodSandbox, format.Pod(pod)) result.AddSyncResult(createSandboxResult) podSandboxID, msg, err = m.createPodSandbox(pod, podContainerChanges.Attempt) if err != nil { ... } klog.V(4).Infof("Created PodSandbox %q for pod %q", podSandboxID, format.Pod(pod)) podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID) if err != nil { ... } // 如果 pod 网络是 host 模式,容器也相同;其他情况下,容器会使用 None 网络模式,让 kubelet 的网络插件自己进行网络配置 if !kubecontainer.IsHostNetworkPod(pod) { // Overwrite the podIPs passed in the pod status, since we just started the pod sandbox. podIPs = m.determinePodSandboxIPs(pod.Namespace, pod.Name, podSandboxStatus) klog.V(4).Infof("Determined the ip %v for pod %q after sandbox changed", podIPs, format.Pod(pod)) } } podIP := "" if len(podIPs) != 0 { podIP = podIPs[0] } // Get podSandboxConfig for containers to start. configPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.ConfigPodSandbox, podSandboxID) result.AddSyncResult(configPodSandboxResult) // 获取 PodSandbox 的配置(如:metadata,clusterDNS,容器的端口映射等) podSandboxConfig, err := m.generatePodSandboxConfig(pod, podContainerChanges.Attempt) if err != nil { message := fmt.Sprintf("GeneratePodSandboxConfig for pod %q failed: %v", format.Pod(pod), err) klog.Error(message) configPodSandboxResult.Fail(kubecontainer.ErrConfigPodSandbox, message) return } // Helper containing boilerplate common to starting all types of containers. // typeName is a label used to describe this type of container in log messages, // currently: "container", "init container" or "ephemeral container" // 启动容器的临时函数 start := func(typeName string, container *v1.Container) error { startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name) result.AddSyncResult(startContainerResult) isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff) if isInBackOff { startContainerResult.Fail(err, msg) klog.V(4).Infof("Backing Off restarting %v %+v in pod %v", typeName, container, format.Pod(pod)) return err } klog.V(4).Infof("Creating %v %+v in pod %v", typeName, container, format.Pod(pod)) if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP); err != nil { startContainerResult.Fail(err, msg) // known errors that are logged in other places are logged at higher levels here to avoid // repetitive log spam switch { case err == images.ErrImagePullBackOff: klog.V(3).Infof("%v start failed: %v: %s", typeName, err, msg) default: utilruntime.HandleError(fmt.Errorf("%v start failed: %v: %s", typeName, err, msg)) } return err } return nil } // Step 5: start ephemeral containers 启动临时容器,它们在初始化容器之前启动,允许运行临时容器,即使存在临时容器,启动初始化容器时出错。实际上,init容器将首先启动,因为临时容器不能在创建pod时指定。 if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) { for _, idx := range podContainerChanges.EphemeralContainersToStart { c := (*v1.Container)(&pod.Spec.EphemeralContainers[idx].EphemeralContainerCommon) start("ephemeral container", c) } } // Step 6: start the init container. 启动init container if container := podContainerChanges.NextInitContainerToStart; container != nil { // Start the next init container. if err := start("init container", container); err != nil { return } // Successfully started the container; clear the entry in the failure klog.V(4).Infof("Completed init container %q for pod %q", container.Name, format.Pod(pod)) } // Step 7: start containers in podContainerChanges.ContainersToStart. 启动业务容器,注意,根据computePodActions,若NextInitContainerToStart不为空,则不存在ContainersToStart ,即这个循环在当前这个SyncPod中不会被执行 for _, idx := range podContainerChanges.ContainersToStart { start("container", &pod.Spec.Containers[idx]) } return } ``` SyncPod中需要特别注意的是:在init containers启动过程中,SyncPod每次只会运行一个init container(next),之后就返回了。后续的流程,包括其余init containers的运行以及工作container的启动,会交给Kubelet.syncLoopIteration下次触发pod 同步时在继续 > computePodActions ``` func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { klog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod) //对比sandbox状态,计算是否需要创建sandbox,以及当前sandbox id createPodSandbox, attempt, sandboxID := m.podSandboxChanged(pod, podStatus) changes := podActions{ KillPod: createPodSandbox, CreateSandbox: createPodSandbox, SandboxID: sandboxID, Attempt: attempt, ContainersToStart: []int{}, ContainersToKill: make(map[kubecontainer.ContainerID]containerToKillInfo), } //需要新建sandbox,一旦进入该分支就必定return,之后代码不再执行 if createPodSandbox { if !shouldRestartOnFailure(pod) && attempt != 0 && len(podStatus.ContainerStatuses) != 0 { changes.CreateSandbox = false return changes } //在新建sandbox分支中,若存在init容器,则取第一个,返回 if len(pod.Spec.InitContainers) != 0 { // Pod has init containers, return the first one. changes.NextInitContainerToStart = &pod.Spec.InitContainers[0] return changes } // 不存在init容器,直接跑工作containers,注意如果有init容器,这里不会被执行 for idx, c := range pod.Spec.Containers { if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { continue } changes.ContainersToStart = append(changes.ContainersToStart, idx) } return changes } // Ephemeral containers may be started even if initialization is not yet complete. if utilfeature.DefaultFeatureGate.Enabled(features.EphemeralContainers) { for i := range pod.Spec.EphemeralContainers { c := (*v1.Container)(&pod.Spec.EphemeralContainers[i].EphemeralContainerCommon) // Ephemeral Containers are never restarted if podStatus.FindContainerStatusByName(c.Name) == nil { changes.EphemeralContainersToStart = append(changes.EphemeralContainersToStart, i) } } } // sandbox已运行,启动init容器。寻找下一个需要执行的init容器 initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus) if !done { if next != nil { initFailed := initLastStatus != nil && isInitContainerFailed(initLastStatus) if initFailed && !shouldRestartOnFailure(pod) { changes.KillPod = true } else { // Always try to stop containers in unknown state first. if initLastStatus != nil && initLastStatus.State == kubecontainer.ContainerStateUnknown { changes.ContainersToKill[initLastStatus.ID] = containerToKillInfo{ name: next.Name, container: next, message: fmt.Sprintf("Init container is in %q state, try killing it before restart", initLastStatus.State), } } changes.NextInitContainerToStart = next } } // 若init未完成,直接返回 return changes } // init已完成,计算需要kill&start的工作container keepCount := 0 // check the status of containers. for idx, container := range pod.Spec.Containers { ... } //是否需要kill pod if keepCount == 0 && len(changes.ContainersToStart) == 0 { changes.KillPod = true } return changes } ```