目录

kubelet node节点信息的上报方式之NodeStatus

简介

分布式系统中服务端会通过心跳机制确认客户端是否存活。在kubernetes中,kubelet也会定时上报心跳到apiserver,以此来判断该node是否存活,若node超过一定时间没有上报心跳,其状态会被设置为NotReady ,宿主上容器的状态也会被设置为Nodelost或者Unknown状态。kubelet自身会定期更新状态到apiserver,通过参数--node-status-update-frequency指定上报频率,默认是10s上报以此,kubelet不止会上报心跳信息同时还会上报自身的一些数据信息。

kubelet 上报了哪些状态

在kubernetes中,一个node的状态包含以下几个信息:

  • Addresses
  • Condition
  • Capacity
  • nodeInfo
  • allocatable
  • daemonEndpoints
  • images

Adresses

addresses <[]Object>

主要包含以下几个字段:

  • address: 节点的地址
  • type: 节点的地址类型,有3种
    • HostName: Hostname可以通过kubelet的 –hostname-override参数进行覆盖
    • ExternalIP: 通常是可以外部路由的node IP地址(从集群外可访问)
    • InternalIP: 通常是仅可在集群内部路由的node IP地址

Condition

conditions字段描述了所有Running nodes的状态

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
  conditions:
  - lastHeartbeatTime: "2021-07-14T01:33:06Z"
    lastTransitionTime: "2021-07-14T01:33:06Z"
    message: Flannel is running on this node
    reason: FlannelIsUp
    status: "False"
    type: NetworkUnavailable
  - lastHeartbeatTime: "2021-08-20T06:57:28Z"
    lastTransitionTime: "2020-06-02T02:44:22Z"
    message: kubelet has sufficient memory available
    reason: KubeletHasSufficientMemory
    status: "False"
    type: MemoryPressure
  - lastHeartbeatTime: "2021-08-20T06:57:28Z"
    lastTransitionTime: "2020-06-02T02:44:22Z"
    message: kubelet has no disk pressure
    reason: KubeletHasNoDiskPressure
    status: "False"
    type: DiskPressure
  - lastHeartbeatTime: "2021-08-20T06:57:28Z"
    lastTransitionTime: "2020-06-02T02:44:22Z"
    message: kubelet has sufficient PID available
    reason: KubeletHasSufficientPID
    status: "False"
    type: PIDPressure
  - lastHeartbeatTime: "2021-08-20T06:57:28Z"
    lastTransitionTime: "2020-06-02T03:04:02Z"
    message: kubelet is posting ready status
    reason: KubeletReady
    status: "True"
    type: Ready

Capacity

描述node上的可用资源:CPU、内存和可以调度到该node上的最大pod数量

1
2
3
4
5
6
7
8
  capacity:
    cpu: "6"
    ephemeral-storage: 204371460Ki
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    memory: 12137728Ki
    pods: "110"

nodeInfo

描述node的一些通用信息,例如内核版本、kubernetes版本(kubelet和kube-proxy版本)、Docker版本和系统版本,这些信息由kubelet从node上获取到。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
  nodeInfo:
    architecture: amd64
    bootID: e2776b16-ab3d-41bf-82b3-b0a928b1de29
    containerRuntimeVersion: docker://18.9.5
    kernelVersion: 3.10.0-957.10.1.el7.x86_64
    kubeProxyVersion: v1.14.1
    kubeletVersion: v1.14.1
    machineID: 8114067df8e947f8aa4c5dd6e4fe6414
    operatingSystem: linux
    osImage: CentOS Linux 7 (Core)
    systemUUID: 564DE144-A298-6501-E0C0-7BB0483AD792

allocatable

描述节点可分配的资源限制信息

1
2
3
4
5
6
7
8
  allocatable:
    cpu: "6"
    ephemeral-storage: "188348737225"
    hugepages-1Gi: "0"
    hugepages-2Mi: "0"
    memory: 12035328Ki
    pods: "110"

daemonEndpoints

节点kubelet 的endpoints信息

1
2
3
4
  daemonEndpoints:
    kubeletEndpoint:
      Port: 10250

images

当前节点上的镜像信息

1
2
3
4
5
6
7
8
9
  - names:
    - rancher/rancher-agent@sha256:e6aa36cca0d3ce9fea180add5b620baabd823fb34f9d100b7a8d3eb734392c37
    - rancher/rancher-agent:v2.3.5
    sizeBytes: 282225373
  - names:
    - images.lingcb.net/sit-netcore/printingui-lingcb-net@sha256:1b4d9cde7e18a096212d7a2d45c1dbc5c86249f379a893e88c9b0f361e2a3760
    - images.lingcb.net/sit-netcore/printingui-lingcb-net:v3.0.0-20200622162958
    sizeBytes: 264286427
...

使用kubectl get node xxx -o yaml可以看到node所有的状态信息,其中status中的信息都是kubelet需要上报的,所以kubelet不止上报心跳信息还上报如Condition字段中的网络插件信息、节点信息、节点OOD信息、内存磁盘压力状态、节点监控状态、是否调度等。

https://xieys.club/images/posts/image-20210820151349246.png

kubelet状态异常时的影响

如果一个node处于非Ready状态超过pod-eviction-timeout的值(默认为5分钟,在kube-controller-manager中定义),在v1.5之前的版本中kube-controller-manager会force delete pod然后调度该宿主机上的pods到其他宿主机上,注意在v1.5之后的版本,kube-controller-manager不会force delete pod,pod会一直处于TerminatingUnknown状态直到node被从master中删除或者kubelet状态变为Ready

在node NotReady期间,其中

  • Daemonset的Pod状态会变为Nodelost
  • Deployment、Statefulset和static pod的状态先变为NodeLost,然后立马变为Unknown。其中Deployment的pod会recreate,static pod和Statefulset的pod会一直处于Unknown状态

当kubelet恢复Ready状态时,其中

  • Daemonset的pod不会recreate,旧pod状态直接变为Running
  • Deployment的则是将kubelet进程停止的Pod删除(因为旧pod状态在集群中有变化,但是pod状态在变化时发现集群中Deployment的pod实例数量符合我们的期望值的话,那就不合理了,所以需要对旧的Pod做删除处理)
  • Statefulset的pod会重新recreate
  • static pod 没有重启,但是Pod的运行时间会在kubelet起来的时候重置为0

kubelet状态上报的实现

kubelet 有两种上报状态的方式,第一种定期向 apiserver 发送心跳消息,简单理解就是启动一个 goroutine 然后定期向 APIServer 发送消息。

第二中被称为 NodeLease,在 v1.13 之前的版本中,节点的心跳只有 NodeStatus,从 v1.13 开始,NodeLease feature 作为 alpha 特性引入。当启用 NodeLease feature 时,每个节点在“kube-node-lease”名称空间中都有一个关联的“Lease”对象,该对象由节点定期更新,NodeStatus 和 NodeLease 都被视为来自节点的心跳。NodeLease 会频繁更新,而只有在 NodeStatus 发生改变或者超过了一定时间(默认值为1分钟,node-monitor-grace-period 的默认值为 40s),才会将 NodeStatus 上报给 master。由于 NodeLease 比 NodeStatus 更轻量级,该特性在集群规模扩展性和性能上有明显提升。

kubernetes 版本: v1.16.6

kubelet启动流程

main -> NewKubeletCommand -> Run -> run -> RunKubelet 通过 createAndInitKubelet 调用NewMainKubelet(通过这个创建kubelet结构体,kubelet结构体实现了kubelet.Bootstrap接口的所有方法)创建 kubelet.Bootstrap接口 然后调用startKubelet方法,在startkubelet中调用的Bootstrap.run方法,实际就是kubelet的Run方法

kubelet 上报状态的代码大部分在kubernetes/pkg/kubelet/kubelet_node_status.go 中实现。状态上报的功能是在 kubernetes/pkg/kubelet/kubelet.go#Run 方法以 goroutine 形式中启动的,kubelet 中多个重要的功能都是在该方法中启动的。

kubernetes/pkg/kubelet/kubelet.go#Run

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
	...
	if kl.kubeClient != nil {
		//这里表示每个nodeStatusUpdateFrequency秒就执行一次syncNodeStatus
		go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
		go kl.fastStatusUpdateOnce()
		//一种新的上报方式
		// start syncing lease
		if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
			go kl.nodeLeaseController.Run(wait.NeverStop)
		}
	}
	...
}

kl.syncNodeStatus便是状态上报的入口函数,其后所调用的多个函数都是在同一个文件中实现

kubernetes/pkg/kubelet/kubelet_node_status.go#syncNodeStatus

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (kl *Kubelet) syncNodeStatus() {
	kl.syncNodeStatusMux.Lock()
	defer kl.syncNodeStatusMux.Unlock()

	if kl.kubeClient == nil || kl.heartbeatClient == nil {
		return
	}
	
	//是否为注册节点
	if kl.registerNode {
		// This will exit immediately if it doesn't need to do anything.
		kl.registerWithAPIServer()
	}
	if err := kl.updateNodeStatus(); err != nil {
		klog.Errorf("Unable to update node status: %v", err)
	}
}

syncNodeStatus调用updateNodeStatus,然后又调用tryUpdateNodeStatus来进行上报操作,而最终调用的是setNodeStatus。这里还进行了同步状态判断,如果是注册节点,则执行registerWithAPIServer,否则,执行updateNodeStatus

updateNodeStatus主要是调用tryUpdateNodeStatus进行后续的操作,该函数中定义了状态上报重试的次数,nodeStatusUpdateRetry默认定义为5次。

kubernetes/pkg/kubelet/kubelet_node_status.go#updateNodeStatus

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
func (kl *Kubelet) updateNodeStatus() error {
	klog.V(5).Infof("Updating node status")
	for i := 0; i < nodeStatusUpdateRetry; i++ {
		if err := kl.tryUpdateNodeStatus(i); err != nil {
			if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
				kl.onRepeatedHeartbeatFailure()
			}
			klog.Errorf("Error updating node status, will retry: %v", err)
		} else {
			return nil
		}
	}
	return fmt.Errorf("update node status exceeds retry count")
}

tryUpdateNodeStatus是主要的上报逻辑,先给node设置状态,然后上报node的状态到apiserver

kubernetes/pkg/kubelet/kubelet_node_status.go#tryUpdateNodeStatus

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {

	opts := metav1.GetOptions{}
	if tryNumber == 0 {
		util.FromApiserverCache(&opts)
	}
	
	//先从apiserver获得node信息
	node, err := kl.heartbeatClient.CoreV1().Nodes().Get(string(kl.nodeName), opts)
	if err != nil {
		return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
	}

	//将node信息深拷贝
	originalNode := node.DeepCopy()
	if originalNode == nil {
		return fmt.Errorf("nil %q node object", kl.nodeName)
	}

	podCIDRChanged := false
	if len(node.Spec.PodCIDRs) != 0 {
		// Pod CIDR could have been updated before, so we cannot rely on
		// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
		// actually changed.
		podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
		if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
			klog.Errorf(err.Error())
		}
	}

	//设置node状态
	kl.setNodeStatus(node)

	now := kl.clock.Now()
	if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
		if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
			kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
			return nil
		}
	}

	// 更新node信息到apiserver
	updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
	if err != nil {
		return err
	}
	kl.lastStatusReportTime = now
	kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
	// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
	// those volumes are already updated in the node's status
	kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
	return nil
}

tryUpdateNodeStatus中调用setNodeStatus设置node的状态,setNodeStatus 会获取一次node的所有状态,然后会将kubelet中保存的所有状态改为最新的值,也就是会重置node status所有字段

1
2
3
4
5
6
7
8
func (kl *Kubelet) setNodeStatus(node *v1.Node) {
	for i, f := range kl.setNodeStatusFuncs {
		klog.V(5).Infof("Setting node status at position %v", i)
		if err := f(node); err != nil {
			klog.Warningf("Failed to set some node status fields: %s", err)
		}
	}
}

setNodeStatus 通过 setNodeStatusFuncs 方法覆盖 node 结构体中所有的字段,setNodeStatusFuncs 是在NewMainKubelet(pkg/kubelet/kubelet.go) 中初始化的。

kubernetes/pkg/kubelet/kubelet.go#NewMainKubelet

1
2
3
4
5
6
 	func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
        // ...
        // Generating the status funcs should be the last thing we do,
    klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()

    return klet, nil

defaultNodeStatusFuncs 是生成状态的函数,通过获取 node 的所有状态指标后使用工厂函数生成状态

kubernetes/pkg/kubelet/kubelet_node_status.go#defaultNodeStatusFuncs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// defaultNodeStatusFuncs is a factory that generates the default set of
// setNodeStatus funcs
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
	// if cloud is not nil, we expect the cloud resource sync manager to exist
	var nodeAddressesFunc func() ([]v1.NodeAddress, error)
	if kl.cloud != nil {
		nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
	}
	var validateHostFunc func() error
	if kl.appArmorValidator != nil {
		validateHostFunc = kl.appArmorValidator.ValidateHost
	}
	var setters []func(n *v1.Node) error
	setters = append(setters,
		nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
		nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
			kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
		nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
		nodestatus.DaemonEndpoints(kl.daemonEndpoints),
		nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
		nodestatus.GoRuntime(),
	)
	if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
		setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
	}
	setters = append(setters,
		nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
		nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
		nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
		nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
		nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
		// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
		// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
		// these side-effects by decoupling the decisions to send events and partial status recording
		// from the Node setters.
		kl.recordNodeSchedulableEvent,
	)
	return setters
}

defaultNodeStatusFuncs 可以看到 node 上报的所有信息,主要有 MemoryPressureConditionDiskPressureConditionPIDPressureConditionReadyCondition 等。每一种 nodestatus 都返回一个 setters,所有 setters 的定义在 pkg/kubelet/nodestatus/setters.go 文件中。

对于二次开发而言,如果需要APIServer掌握更多的Node信息,可以再此处添加自定义函数。例如:上报磁盘信息等。

tryUpdateNodeStatus 中最后调用 PatchNodeStatus 上报 node 的状态到 master。

kubernetes/pkg/util/node/node.go#PatchNodeStatus

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
// PatchNodeStatus patches node status.
func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) {
	//对比从apiserver中获取到的node信息以及重置后的node信息,计算 patch
	patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
	if err != nil {
		return nil, nil, err
	}

	updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
	if err != nil {
		return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
	}
	return updatedNode, patchBytes, nil
}

在 PatchNodeStatus 会调用已注册的那些方法将状态把状态发给 APIServer。

总结

上述主要讲述了kubelet上报node状态的方式以及实现,node状态上报的方式目前有2种,上面仅仅分析了第一种状态上报的方式。在大规模集群中由于节点数量比较多,所有node都频繁上报状态会对etcd造成一定的压力,当node和apiserver通信时由于网络导致心跳上报失败也会影响node的状态,为了避免此类问题的出现才有了第二种上报方式NodeLease。