# kubelet node节点信息的上报方式之NodeStatus ## 简介 分布式系统中服务端会通过心跳机制确认客户端是否存活。在kubernetes中,kubelet也会定时上报心跳到apiserver,以此来判断该node是否存活,若node超过一定时间没有上报心跳,其状态会被设置为`NotReady` ,宿主上容器的状态也会被设置为`Nodelost`或者`Unknown`状态。kubelet自身会定期更新状态到apiserver,通过参数`--node-status-update-frequency`指定上报频率,默认是10s上报以此,kubelet不止会上报心跳信息同时还会上报自身的一些数据信息。 ## kubelet 上报了哪些状态 在kubernetes中,一个node的状态包含以下几个信息: - Addresses - Condition - Capacity - nodeInfo - allocatable - daemonEndpoints - images ### Adresses > addresses <[]Object> 主要包含以下几个字段: - address: 节点的地址 - type: 节点的地址类型,有3种 - HostName: Hostname可以通过kubelet的 --hostname-override参数进行覆盖 - ExternalIP: 通常是可以外部路由的node IP地址(从集群外可访问) - InternalIP: 通常是仅可在集群内部路由的node IP地址 ### Condition > conditions字段描述了所有Running nodes的状态 ``` conditions: - lastHeartbeatTime: "2021-07-14T01:33:06Z" lastTransitionTime: "2021-07-14T01:33:06Z" message: Flannel is running on this node reason: FlannelIsUp status: "False" type: NetworkUnavailable - lastHeartbeatTime: "2021-08-20T06:57:28Z" lastTransitionTime: "2020-06-02T02:44:22Z" message: kubelet has sufficient memory available reason: KubeletHasSufficientMemory status: "False" type: MemoryPressure - lastHeartbeatTime: "2021-08-20T06:57:28Z" lastTransitionTime: "2020-06-02T02:44:22Z" message: kubelet has no disk pressure reason: KubeletHasNoDiskPressure status: "False" type: DiskPressure - lastHeartbeatTime: "2021-08-20T06:57:28Z" lastTransitionTime: "2020-06-02T02:44:22Z" message: kubelet has sufficient PID available reason: KubeletHasSufficientPID status: "False" type: PIDPressure - lastHeartbeatTime: "2021-08-20T06:57:28Z" lastTransitionTime: "2020-06-02T03:04:02Z" message: kubelet is posting ready status reason: KubeletReady status: "True" type: Ready ``` ### Capacity > 描述node上的可用资源:CPU、内存和可以调度到该node上的最大pod数量 ``` capacity: cpu: "6" ephemeral-storage: 204371460Ki hugepages-1Gi: "0" hugepages-2Mi: "0" memory: 12137728Ki pods: "110" ``` ### nodeInfo 描述node的一些通用信息,例如内核版本、kubernetes版本(kubelet和kube-proxy版本)、Docker版本和系统版本,这些信息由kubelet从node上获取到。 ``` nodeInfo: architecture: amd64 bootID: e2776b16-ab3d-41bf-82b3-b0a928b1de29 containerRuntimeVersion: docker://18.9.5 kernelVersion: 3.10.0-957.10.1.el7.x86_64 kubeProxyVersion: v1.14.1 kubeletVersion: v1.14.1 machineID: 8114067df8e947f8aa4c5dd6e4fe6414 operatingSystem: linux osImage: CentOS Linux 7 (Core) systemUUID: 564DE144-A298-6501-E0C0-7BB0483AD792 ``` ### allocatable > 描述节点可分配的资源限制信息 ``` allocatable: cpu: "6" ephemeral-storage: "188348737225" hugepages-1Gi: "0" hugepages-2Mi: "0" memory: 12035328Ki pods: "110" ``` ### daemonEndpoints > 节点kubelet 的endpoints信息 ``` daemonEndpoints: kubeletEndpoint: Port: 10250 ``` ### images > 当前节点上的镜像信息 ``` - names: - rancher/rancher-agent@sha256:e6aa36cca0d3ce9fea180add5b620baabd823fb34f9d100b7a8d3eb734392c37 - rancher/rancher-agent:v2.3.5 sizeBytes: 282225373 - names: - images.lingcb.net/sit-netcore/printingui-lingcb-net@sha256:1b4d9cde7e18a096212d7a2d45c1dbc5c86249f379a893e88c9b0f361e2a3760 - images.lingcb.net/sit-netcore/printingui-lingcb-net:v3.0.0-20200622162958 sizeBytes: 264286427 ... ``` 使用`kubectl get node xxx -o yaml`可以看到node所有的状态信息,其中status中的信息都是kubelet需要上报的,所以kubelet不止上报心跳信息还上报如Condition字段中的网络插件信息、节点信息、节点OOD信息、内存磁盘压力状态、节点监控状态、是否调度等。 ![image-20210820151349246](https://xieys.club/images/posts/image-20210820151349246.png) ## kubelet状态异常时的影响 如果一个node处于非Ready状态超过`pod-eviction-timeout`的值(默认为5分钟,在kube-controller-manager中定义),在v1.5之前的版本中kube-controller-manager会`force delete pod`然后调度该宿主机上的pods到其他宿主机上,**注意在v1.5之后的版本,kube-controller-manager不会`force delete pod`,pod会一直处于`Terminating`或`Unknown`状态直到node被从master中删除或者kubelet状态变为Ready**。 在node NotReady期间,其中 - Daemonset的Pod状态会变为`Nodelost` - Deployment、Statefulset和static pod的状态先变为`NodeLost`,然后立马变为`Unknown`。其中Deployment的pod会recreate,static pod和Statefulset的pod会一直处于Unknown状态 当kubelet恢复Ready状态时,其中 - Daemonset的pod不会recreate,旧pod状态直接变为Running - Deployment的则是将kubelet进程停止的Pod删除(因为旧pod状态在集群中有变化,但是pod状态在变化时发现集群中Deployment的pod实例数量符合我们的期望值的话,那就不合理了,所以需要对旧的Pod做删除处理) - Statefulset的pod会重新recreate - static pod 没有重启,但是Pod的运行时间会在kubelet起来的时候重置为0 ## kubelet状态上报的实现 kubelet 有两种上报状态的方式,第一种定期向 apiserver 发送心跳消息,简单理解就是启动一个 goroutine 然后定期向 APIServer 发送消息。 第二中被称为 NodeLease,在 v1.13 之前的版本中,节点的心跳只有 NodeStatus,从 v1.13 开始,NodeLease feature 作为 alpha 特性引入。当启用 NodeLease feature 时,每个节点在“kube-node-lease”名称空间中都有一个关联的“Lease”对象,该对象由节点定期更新,NodeStatus 和 NodeLease 都被视为来自节点的心跳。NodeLease 会频繁更新,而只有在 NodeStatus 发生改变或者超过了一定时间(默认值为1分钟,node-monitor-grace-period 的默认值为 40s),才会将 NodeStatus 上报给 master。由于 NodeLease 比 NodeStatus 更轻量级,该特性在集群规模扩展性和性能上有明显提升。 > kubernetes 版本: v1.16.6 kubelet启动流程 main -> NewKubeletCommand -> Run -> run -> RunKubelet 通过 **createAndInitKubelet 调用NewMainKubelet(通过这个创建kubelet结构体,kubelet结构体实现了kubelet.Bootstrap接口的所有方法)创建 kubelet.Bootstrap接口** 然后调用startKubelet方法,在startkubelet中调用的Bootstrap.run方法,实际就是kubelet的Run方法 kubelet 上报状态的代码大部分在`kubernetes/pkg/kubelet/kubelet_node_status.go` 中实现。状态上报的功能是在 `kubernetes/pkg/kubelet/kubelet.go#Run` 方法以 goroutine 形式中启动的,kubelet 中多个重要的功能都是在该方法中启动的。 > kubernetes/pkg/kubelet/kubelet.go#Run ``` // Run starts the kubelet reacting to config updates func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { ... if kl.kubeClient != nil { //这里表示每个nodeStatusUpdateFrequency秒就执行一次syncNodeStatus go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) go kl.fastStatusUpdateOnce() //一种新的上报方式 // start syncing lease if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) { go kl.nodeLeaseController.Run(wait.NeverStop) } } ... } ``` `kl.syncNodeStatus`便是状态上报的入口函数,其后所调用的多个函数都是在同一个文件中实现 > kubernetes/pkg/kubelet/kubelet_node_status.go#syncNodeStatus ``` func (kl *Kubelet) syncNodeStatus() { kl.syncNodeStatusMux.Lock() defer kl.syncNodeStatusMux.Unlock() if kl.kubeClient == nil || kl.heartbeatClient == nil { return } //是否为注册节点 if kl.registerNode { // This will exit immediately if it doesn't need to do anything. kl.registerWithAPIServer() } if err := kl.updateNodeStatus(); err != nil { klog.Errorf("Unable to update node status: %v", err) } } ``` `syncNodeStatus`调用`updateNodeStatus`,然后又调用`tryUpdateNodeStatus`来进行上报操作,而最终调用的是setNodeStatus。这里还进行了同步状态判断,如果是注册节点,则执行`registerWithAPIServer`,否则,执行`updateNodeStatus`。 `updateNodeStatus`主要是调用`tryUpdateNodeStatus`进行后续的操作,该函数中定义了状态上报重试的次数,`nodeStatusUpdateRetry`默认定义为5次。 > kubernetes/pkg/kubelet/kubelet_node_status.go#updateNodeStatus ``` func (kl *Kubelet) updateNodeStatus() error { klog.V(5).Infof("Updating node status") for i := 0; i < nodeStatusUpdateRetry; i++ { if err := kl.tryUpdateNodeStatus(i); err != nil { if i > 0 && kl.onRepeatedHeartbeatFailure != nil { kl.onRepeatedHeartbeatFailure() } klog.Errorf("Error updating node status, will retry: %v", err) } else { return nil } } return fmt.Errorf("update node status exceeds retry count") } ``` `tryUpdateNodeStatus`是主要的上报逻辑,先给node设置状态,然后上报node的状态到apiserver > kubernetes/pkg/kubelet/kubelet_node_status.go#tryUpdateNodeStatus ``` func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error { opts := metav1.GetOptions{} if tryNumber == 0 { util.FromApiserverCache(&opts) } //先从apiserver获得node信息 node, err := kl.heartbeatClient.CoreV1().Nodes().Get(string(kl.nodeName), opts) if err != nil { return fmt.Errorf("error getting node %q: %v", kl.nodeName, err) } //将node信息深拷贝 originalNode := node.DeepCopy() if originalNode == nil { return fmt.Errorf("nil %q node object", kl.nodeName) } podCIDRChanged := false if len(node.Spec.PodCIDRs) != 0 { // Pod CIDR could have been updated before, so we cannot rely on // node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is // actually changed. podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil { klog.Errorf(err.Error()) } } //设置node状态 kl.setNodeStatus(node) now := kl.clock.Now() if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) { if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) { kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse) return nil } } // 更新node信息到apiserver updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node) if err != nil { return err } kl.lastStatusReportTime = now kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses) // If update finishes successfully, mark the volumeInUse as reportedInUse to indicate // those volumes are already updated in the node's status kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse) return nil } ``` `tryUpdateNodeStatus`中调用`setNodeStatus`设置node的状态,`setNodeStatus `会获取一次node的所有状态,然后会将kubelet中保存的所有状态改为最新的值,也就是会重置node status所有字段 ``` func (kl *Kubelet) setNodeStatus(node *v1.Node) { for i, f := range kl.setNodeStatusFuncs { klog.V(5).Infof("Setting node status at position %v", i) if err := f(node); err != nil { klog.Warningf("Failed to set some node status fields: %s", err) } } } ``` `setNodeStatus `通过 `setNodeStatusFuncs` 方法覆盖 node 结构体中所有的字段,`setNodeStatusFuncs` 是在`NewMainKubelet(pkg/kubelet/kubelet.go) `中初始化的。 > kubernetes/pkg/kubelet/kubelet.go#NewMainKubelet ``` func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, // ... // Generating the status funcs should be the last thing we do, klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() return klet, nil ``` `defaultNodeStatusFuncs` 是生成状态的函数,通过获取 node 的所有状态指标后使用工厂函数生成状态 > kubernetes/pkg/kubelet/kubelet_node_status.go#defaultNodeStatusFuncs ``` // defaultNodeStatusFuncs is a factory that generates the default set of // setNodeStatus funcs func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error { // if cloud is not nil, we expect the cloud resource sync manager to exist var nodeAddressesFunc func() ([]v1.NodeAddress, error) if kl.cloud != nil { nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses } var validateHostFunc func() error if kl.appArmorValidator != nil { validateHostFunc = kl.appArmorValidator.ValidateHost } var setters []func(n *v1.Node) error setters = append(setters, nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc), nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity, kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent), nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version), nodestatus.DaemonEndpoints(kl.daemonEndpoints), nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList), nodestatus.GoRuntime(), ) if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) { setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits)) } setters = append(setters, nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent), nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent), nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent), nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent), nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse), // TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event // and record state back to the Kubelet runtime object. In the future, I'd like to isolate // these side-effects by decoupling the decisions to send events and partial status recording // from the Node setters. kl.recordNodeSchedulableEvent, ) return setters } ``` `defaultNodeStatusFuncs `可以看到 node 上报的所有信息,主要有 `MemoryPressureCondition`、`DiskPressureCondition`、`PIDPressureCondition`、`ReadyCondition` 等。每一种 nodestatus 都返回一个 setters,所有 setters 的定义在 pkg/kubelet/nodestatus/setters.go 文件中。 对于二次开发而言,如果需要APIServer掌握更多的Node信息,可以再此处添加自定义函数。例如:上报磁盘信息等。 `tryUpdateNodeStatus` 中最后调用 `PatchNodeStatus `上报 node 的状态到 master。 > kubernetes/pkg/util/node/node.go#PatchNodeStatus ``` // PatchNodeStatus patches node status. func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) { //对比从apiserver中获取到的node信息以及重置后的node信息,计算 patch patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode) if err != nil { return nil, nil, err } updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status") if err != nil { return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err) } return updatedNode, patchBytes, nil } ``` 在 PatchNodeStatus 会调用已注册的那些方法将状态把状态发给 APIServer。 ## 总结 上述主要讲述了kubelet上报node状态的方式以及实现,node状态上报的方式目前有2种,上面仅仅分析了第一种状态上报的方式。在大规模集群中由于节点数量比较多,所有node都频繁上报状态会对etcd造成一定的压力,当node和apiserver通信时由于网络导致心跳上报失败也会影响node的状态,为了避免此类问题的出现才有了第二种上报方式NodeLease。