简介
分布式系统中服务端会通过心跳机制确认客户端是否存活。在kubernetes中,kubelet也会定时上报心跳到apiserver,以此来判断该node是否存活,若node超过一定时间没有上报心跳,其状态会被设置为NotReady
,宿主上容器的状态也会被设置为Nodelost
或者Unknown
状态。kubelet自身会定期更新状态到apiserver,通过参数--node-status-update-frequency
指定上报频率,默认是10s上报以此,kubelet不止会上报心跳信息同时还会上报自身的一些数据信息。
kubelet 上报了哪些状态
在kubernetes中,一个node的状态包含以下几个信息:
- Addresses
- Condition
- Capacity
- nodeInfo
- allocatable
- daemonEndpoints
- images
Adresses
addresses <[]Object>
主要包含以下几个字段:
- address: 节点的地址
- type: 节点的地址类型,有3种
- HostName: Hostname可以通过kubelet的 –hostname-override参数进行覆盖
- ExternalIP: 通常是可以外部路由的node IP地址(从集群外可访问)
- InternalIP: 通常是仅可在集群内部路由的node IP地址
Condition
conditions字段描述了所有Running nodes的状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
conditions:
- lastHeartbeatTime: "2021-07-14T01:33:06Z"
lastTransitionTime: "2021-07-14T01:33:06Z"
message: Flannel is running on this node
reason: FlannelIsUp
status: "False"
type: NetworkUnavailable
- lastHeartbeatTime: "2021-08-20T06:57:28Z"
lastTransitionTime: "2020-06-02T02:44:22Z"
message: kubelet has sufficient memory available
reason: KubeletHasSufficientMemory
status: "False"
type: MemoryPressure
- lastHeartbeatTime: "2021-08-20T06:57:28Z"
lastTransitionTime: "2020-06-02T02:44:22Z"
message: kubelet has no disk pressure
reason: KubeletHasNoDiskPressure
status: "False"
type: DiskPressure
- lastHeartbeatTime: "2021-08-20T06:57:28Z"
lastTransitionTime: "2020-06-02T02:44:22Z"
message: kubelet has sufficient PID available
reason: KubeletHasSufficientPID
status: "False"
type: PIDPressure
- lastHeartbeatTime: "2021-08-20T06:57:28Z"
lastTransitionTime: "2020-06-02T03:04:02Z"
message: kubelet is posting ready status
reason: KubeletReady
status: "True"
type: Ready
|
Capacity
描述node上的可用资源:CPU、内存和可以调度到该node上的最大pod数量
1
2
3
4
5
6
7
8
|
capacity:
cpu: "6"
ephemeral-storage: 204371460Ki
hugepages-1Gi: "0"
hugepages-2Mi: "0"
memory: 12137728Ki
pods: "110"
|
nodeInfo
描述node的一些通用信息,例如内核版本、kubernetes版本(kubelet和kube-proxy版本)、Docker版本和系统版本,这些信息由kubelet从node上获取到。
1
2
3
4
5
6
7
8
9
10
11
12
|
nodeInfo:
architecture: amd64
bootID: e2776b16-ab3d-41bf-82b3-b0a928b1de29
containerRuntimeVersion: docker://18.9.5
kernelVersion: 3.10.0-957.10.1.el7.x86_64
kubeProxyVersion: v1.14.1
kubeletVersion: v1.14.1
machineID: 8114067df8e947f8aa4c5dd6e4fe6414
operatingSystem: linux
osImage: CentOS Linux 7 (Core)
systemUUID: 564DE144-A298-6501-E0C0-7BB0483AD792
|
allocatable
描述节点可分配的资源限制信息
1
2
3
4
5
6
7
8
|
allocatable:
cpu: "6"
ephemeral-storage: "188348737225"
hugepages-1Gi: "0"
hugepages-2Mi: "0"
memory: 12035328Ki
pods: "110"
|
daemonEndpoints
节点kubelet 的endpoints信息
1
2
3
4
|
daemonEndpoints:
kubeletEndpoint:
Port: 10250
|
images
当前节点上的镜像信息
1
2
3
4
5
6
7
8
9
|
- names:
- rancher/rancher-agent@sha256:e6aa36cca0d3ce9fea180add5b620baabd823fb34f9d100b7a8d3eb734392c37
- rancher/rancher-agent:v2.3.5
sizeBytes: 282225373
- names:
- images.lingcb.net/sit-netcore/printingui-lingcb-net@sha256:1b4d9cde7e18a096212d7a2d45c1dbc5c86249f379a893e88c9b0f361e2a3760
- images.lingcb.net/sit-netcore/printingui-lingcb-net:v3.0.0-20200622162958
sizeBytes: 264286427
...
|
使用kubectl get node xxx -o yaml
可以看到node所有的状态信息,其中status中的信息都是kubelet需要上报的,所以kubelet不止上报心跳信息还上报如Condition字段中的网络插件信息、节点信息、节点OOD信息、内存磁盘压力状态、节点监控状态、是否调度等。
kubelet状态异常时的影响
如果一个node处于非Ready状态超过pod-eviction-timeout
的值(默认为5分钟,在kube-controller-manager中定义),在v1.5之前的版本中kube-controller-manager会force delete pod
然后调度该宿主机上的pods到其他宿主机上,注意在v1.5之后的版本,kube-controller-manager不会force delete pod
,pod会一直处于Terminating
或Unknown
状态直到node被从master中删除或者kubelet状态变为Ready。
在node NotReady期间,其中
- Daemonset的Pod状态会变为
Nodelost
- Deployment、Statefulset和static pod的状态先变为
NodeLost
,然后立马变为Unknown
。其中Deployment的pod会recreate,static pod和Statefulset的pod会一直处于Unknown状态
当kubelet恢复Ready状态时,其中
- Daemonset的pod不会recreate,旧pod状态直接变为Running
- Deployment的则是将kubelet进程停止的Pod删除(因为旧pod状态在集群中有变化,但是pod状态在变化时发现集群中Deployment的pod实例数量符合我们的期望值的话,那就不合理了,所以需要对旧的Pod做删除处理)
- Statefulset的pod会重新recreate
- static pod 没有重启,但是Pod的运行时间会在kubelet起来的时候重置为0
kubelet状态上报的实现
kubelet 有两种上报状态的方式,第一种定期向 apiserver 发送心跳消息,简单理解就是启动一个 goroutine 然后定期向 APIServer 发送消息。
第二中被称为 NodeLease,在 v1.13 之前的版本中,节点的心跳只有 NodeStatus,从 v1.13 开始,NodeLease feature 作为 alpha 特性引入。当启用 NodeLease feature 时,每个节点在“kube-node-lease”名称空间中都有一个关联的“Lease”对象,该对象由节点定期更新,NodeStatus 和 NodeLease 都被视为来自节点的心跳。NodeLease 会频繁更新,而只有在 NodeStatus 发生改变或者超过了一定时间(默认值为1分钟,node-monitor-grace-period 的默认值为 40s),才会将 NodeStatus 上报给 master。由于 NodeLease 比 NodeStatus 更轻量级,该特性在集群规模扩展性和性能上有明显提升。
kubernetes 版本: v1.16.6
kubelet启动流程
main -> NewKubeletCommand -> Run -> run -> RunKubelet 通过 createAndInitKubelet 调用NewMainKubelet(通过这个创建kubelet结构体,kubelet结构体实现了kubelet.Bootstrap接口的所有方法)创建 kubelet.Bootstrap接口 然后调用startKubelet方法,在startkubelet中调用的Bootstrap.run方法,实际就是kubelet的Run方法
kubelet 上报状态的代码大部分在kubernetes/pkg/kubelet/kubelet_node_status.go
中实现。状态上报的功能是在 kubernetes/pkg/kubelet/kubelet.go#Run
方法以 goroutine 形式中启动的,kubelet 中多个重要的功能都是在该方法中启动的。
kubernetes/pkg/kubelet/kubelet.go#Run
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
...
if kl.kubeClient != nil {
//这里表示每个nodeStatusUpdateFrequency秒就执行一次syncNodeStatus
go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
go kl.fastStatusUpdateOnce()
//一种新的上报方式
// start syncing lease
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) {
go kl.nodeLeaseController.Run(wait.NeverStop)
}
}
...
}
|
kl.syncNodeStatus
便是状态上报的入口函数,其后所调用的多个函数都是在同一个文件中实现
kubernetes/pkg/kubelet/kubelet_node_status.go#syncNodeStatus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
func (kl *Kubelet) syncNodeStatus() {
kl.syncNodeStatusMux.Lock()
defer kl.syncNodeStatusMux.Unlock()
if kl.kubeClient == nil || kl.heartbeatClient == nil {
return
}
//是否为注册节点
if kl.registerNode {
// This will exit immediately if it doesn't need to do anything.
kl.registerWithAPIServer()
}
if err := kl.updateNodeStatus(); err != nil {
klog.Errorf("Unable to update node status: %v", err)
}
}
|
syncNodeStatus
调用updateNodeStatus
,然后又调用tryUpdateNodeStatus
来进行上报操作,而最终调用的是setNodeStatus。这里还进行了同步状态判断,如果是注册节点,则执行registerWithAPIServer
,否则,执行updateNodeStatus
。
updateNodeStatus
主要是调用tryUpdateNodeStatus
进行后续的操作,该函数中定义了状态上报重试的次数,nodeStatusUpdateRetry
默认定义为5次。
kubernetes/pkg/kubelet/kubelet_node_status.go#updateNodeStatus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
func (kl *Kubelet) updateNodeStatus() error {
klog.V(5).Infof("Updating node status")
for i := 0; i < nodeStatusUpdateRetry; i++ {
if err := kl.tryUpdateNodeStatus(i); err != nil {
if i > 0 && kl.onRepeatedHeartbeatFailure != nil {
kl.onRepeatedHeartbeatFailure()
}
klog.Errorf("Error updating node status, will retry: %v", err)
} else {
return nil
}
}
return fmt.Errorf("update node status exceeds retry count")
}
|
tryUpdateNodeStatus
是主要的上报逻辑,先给node设置状态,然后上报node的状态到apiserver
kubernetes/pkg/kubelet/kubelet_node_status.go#tryUpdateNodeStatus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
opts := metav1.GetOptions{}
if tryNumber == 0 {
util.FromApiserverCache(&opts)
}
//先从apiserver获得node信息
node, err := kl.heartbeatClient.CoreV1().Nodes().Get(string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}
//将node信息深拷贝
originalNode := node.DeepCopy()
if originalNode == nil {
return fmt.Errorf("nil %q node object", kl.nodeName)
}
podCIDRChanged := false
if len(node.Spec.PodCIDRs) != 0 {
// Pod CIDR could have been updated before, so we cannot rely on
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
// actually changed.
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
klog.Errorf(err.Error())
}
}
//设置node状态
kl.setNodeStatus(node)
now := kl.clock.Now()
if utilfeature.DefaultFeatureGate.Enabled(features.NodeLease) && now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
return nil
}
}
// 更新node信息到apiserver
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
}
kl.lastStatusReportTime = now
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
return nil
}
|
tryUpdateNodeStatus
中调用setNodeStatus
设置node的状态,setNodeStatus
会获取一次node的所有状态,然后会将kubelet中保存的所有状态改为最新的值,也就是会重置node status所有字段
1
2
3
4
5
6
7
8
|
func (kl *Kubelet) setNodeStatus(node *v1.Node) {
for i, f := range kl.setNodeStatusFuncs {
klog.V(5).Infof("Setting node status at position %v", i)
if err := f(node); err != nil {
klog.Warningf("Failed to set some node status fields: %s", err)
}
}
}
|
setNodeStatus
通过 setNodeStatusFuncs
方法覆盖 node 结构体中所有的字段,setNodeStatusFuncs
是在NewMainKubelet(pkg/kubelet/kubelet.go)
中初始化的。
kubernetes/pkg/kubelet/kubelet.go#NewMainKubelet
1
2
3
4
5
6
|
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
// ...
// Generating the status funcs should be the last thing we do,
klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs()
return klet, nil
|
defaultNodeStatusFuncs
是生成状态的函数,通过获取 node 的所有状态指标后使用工厂函数生成状态
kubernetes/pkg/kubelet/kubelet_node_status.go#defaultNodeStatusFuncs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
// defaultNodeStatusFuncs is a factory that generates the default set of
// setNodeStatus funcs
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {
// if cloud is not nil, we expect the cloud resource sync manager to exist
var nodeAddressesFunc func() ([]v1.NodeAddress, error)
if kl.cloud != nil {
nodeAddressesFunc = kl.cloudResourceSyncManager.NodeAddresses
}
var validateHostFunc func() error
if kl.appArmorValidator != nil {
validateHostFunc = kl.appArmorValidator.ValidateHost
}
var setters []func(n *v1.Node) error
setters = append(setters,
nodestatus.NodeAddress(kl.nodeIP, kl.nodeIPValidator, kl.hostname, kl.hostnameOverridden, kl.externalCloudProvider, kl.cloud, nodeAddressesFunc),
nodestatus.MachineInfo(string(kl.nodeName), kl.maxPods, kl.podsPerCore, kl.GetCachedMachineInfo, kl.containerManager.GetCapacity,
kl.containerManager.GetDevicePluginResourceCapacity, kl.containerManager.GetNodeAllocatableReservation, kl.recordEvent),
nodestatus.VersionInfo(kl.cadvisor.VersionInfo, kl.containerRuntime.Type, kl.containerRuntime.Version),
nodestatus.DaemonEndpoints(kl.daemonEndpoints),
nodestatus.Images(kl.nodeStatusMaxImages, kl.imageManager.GetImageList),
nodestatus.GoRuntime(),
)
if utilfeature.DefaultFeatureGate.Enabled(features.AttachVolumeLimit) {
setters = append(setters, nodestatus.VolumeLimits(kl.volumePluginMgr.ListVolumePluginWithLimits))
}
setters = append(setters,
nodestatus.MemoryPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderMemoryPressure, kl.recordNodeStatusEvent),
nodestatus.DiskPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderDiskPressure, kl.recordNodeStatusEvent),
nodestatus.PIDPressureCondition(kl.clock.Now, kl.evictionManager.IsUnderPIDPressure, kl.recordNodeStatusEvent),
nodestatus.ReadyCondition(kl.clock.Now, kl.runtimeState.runtimeErrors, kl.runtimeState.networkErrors, kl.runtimeState.storageErrors, validateHostFunc, kl.containerManager.Status, kl.recordNodeStatusEvent),
nodestatus.VolumesInUse(kl.volumeManager.ReconcilerStatesHasBeenSynced, kl.volumeManager.GetVolumesInUse),
// TODO(mtaufen): I decided not to move this setter for now, since all it does is send an event
// and record state back to the Kubelet runtime object. In the future, I'd like to isolate
// these side-effects by decoupling the decisions to send events and partial status recording
// from the Node setters.
kl.recordNodeSchedulableEvent,
)
return setters
}
|
defaultNodeStatusFuncs
可以看到 node 上报的所有信息,主要有 MemoryPressureCondition
、DiskPressureCondition
、PIDPressureCondition
、ReadyCondition
等。每一种 nodestatus 都返回一个 setters,所有 setters 的定义在 pkg/kubelet/nodestatus/setters.go 文件中。
对于二次开发而言,如果需要APIServer掌握更多的Node信息,可以再此处添加自定义函数。例如:上报磁盘信息等。
tryUpdateNodeStatus
中最后调用 PatchNodeStatus
上报 node 的状态到 master。
kubernetes/pkg/util/node/node.go#PatchNodeStatus
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// PatchNodeStatus patches node status.
func PatchNodeStatus(c v1core.CoreV1Interface, nodeName types.NodeName, oldNode *v1.Node, newNode *v1.Node) (*v1.Node, []byte, error) {
//对比从apiserver中获取到的node信息以及重置后的node信息,计算 patch
patchBytes, err := preparePatchBytesforNodeStatus(nodeName, oldNode, newNode)
if err != nil {
return nil, nil, err
}
updatedNode, err := c.Nodes().Patch(string(nodeName), types.StrategicMergePatchType, patchBytes, "status")
if err != nil {
return nil, nil, fmt.Errorf("failed to patch status %q for node %q: %v", patchBytes, nodeName, err)
}
return updatedNode, patchBytes, nil
}
|
在 PatchNodeStatus 会调用已注册的那些方法将状态把状态发给 APIServer。
总结
上述主要讲述了kubelet上报node状态的方式以及实现,node状态上报的方式目前有2种,上面仅仅分析了第一种状态上报的方式。在大规模集群中由于节点数量比较多,所有node都频繁上报状态会对etcd造成一定的压力,当node和apiserver通信时由于网络导致心跳上报失败也会影响node的状态,为了避免此类问题的出现才有了第二种上报方式NodeLease。