# kubernetes中的事件处理机制 ## 简述 当集群中node或pod异常时,我们可以使用kubectl查看对应的events,实际上再k8s中各个组件会将运行时产生的各种事件汇报到apiserver,对于k8s中的可描述资源,使用`kubectl describe `都可以看到其相关的events。基本各个组件(controller-manage、kube-proxy、kube-scheduler、kubelet)都会使用`EventRecorder` ## Events的定义 events 在 `k8s.io/api/core/v1/types.go` 中进行定义,结构体如下所示: ``` type Event struct { //类型元数据 metav1.TypeMeta `json:",inline"` //标准对象元数据 metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"` // 该事件所属的对象 InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"` // 对象当前状态 Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"` //消息描述 Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"` //报告此事件的组件 Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"` //这一事件首次被记录下来的时间。(服务器接收时间为TypeMeta) FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"` //这一事件最近一次发生的记录时间。 LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"` //此事件发生的次数。 Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"` //此事件的类型(Normal, Warning),后面的版本可能会添加新的类型 Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"` // 这个事件第一次被观察到的时间。 EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"` //关于该事件表示的事件系列的数据,如果它是一个单例事件,则为nil。 Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"` Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"` //用于更复杂操作的可选次要对象。 Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"` //触发此事件的控制器的名称,例如。“kubernetes.io/kubelet”。 ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"` //控制器实例的ID。“kubelet-xyzf”。 ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"` } ``` 其中 InvolvedObject 代表和事件关联的对象,source 代表事件源,使用 kubectl 看到的事件一般包含 Type、Reason、Age、From、Message 几个字段。 ``` Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal NodeReady 19m (x518 over 42d) kubelet, k8s-node010 Node k8s-node010 status is now: NodeReady Normal NodeNotReady 4m35s (x521 over 42d) kubelet, k8s-node010 Node k8s-node010 status is now: NodeNotReady ``` k8s 中 events 目前只有两种类型:”Normal” 和 “Warning”: ``` const ( // Information only and will not cause any problems EventTypeNormal string = "Normal" // These events are to warn that something might go wrong EventTypeWarning string = "Warning" ) ``` ## EventBroadcaster events的整个生命周期都与EventBroadcaster有关 ### EventBroadcaster的初始化 > kubelet中对EventBroadcaster的初始化在kubernetes/cmd/kubelet/app/server.go中 ``` func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error { ... // event初始化 makeEventRecorder(kubeDeps, nodeName) ... } func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { if kubeDeps.Recorder != nil { return } //初始化EventBroadcaster eventBroadcaster := record.NewBroadcaster() //初始化 EventRecorder kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) // 记录 events 到本地日志,可以再系统日志/var/log/message上看到 eventBroadcaster.StartLogging(klog.V(3).Infof) if kubeDeps.EventClient != nil { klog.V(4).Infof("Sending events to api server.") // 上报 events 到 apiserver eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")}) } else { klog.Warning("No api server defined - no events will be sent to API server.") } } ``` **Kubelet 在启动的时候会初始化一个 EventBroadcaster,它主要是对接收到的 events 做一些后续的处理(保存、上报等)**,EventBroadcaster 也会被 kubelet 中的其他模块使用,以下是相关的定义,对 events 生成和处理的函数都定义在 `k8s.io/client-go/tools/record/event.go` 中: > k8s.io/client-go/tools/record/event.go ``` type eventBroadcasterImpl struct { *watch.Broadcaster sleepDuration time.Duration options CorrelatorOptions } type EventBroadcaster interface { StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface StartRecordingToSink(sink EventSink) watch.Interface StartLogging(logf func(format string, args ...interface{})) watch.Interface NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder } ``` EventBroadcaster 是个接口类型,该接口有以下四个方法: - StartEventWatcher() : EventBroadcaster 中的核心方法,接收各模块产生的 events,参数为一个处理 events 的函数,用户可以使用 StartEventWatcher() 接收 events 然后使用自定义的 handle 进行处理 - StartRecordingToSink() : 调用 StartEventWatcher() 接收 events,并将收到的 events 发送到 apiserver - StartLogging() :也是调用 StartEventWatcher() 接收 events,然后保存 events 到日志 - NewRecorder() :会创建一个指定 EventSource 的 EventRecorder,EventSource 指明了哪个节点的哪个组件 eventBroadcasterImpl 是 eventBroadcaster 实际的对象,初始化 EventBroadcaster 对象的时候会初始化一个 Broadcaster,Broadcaster 会启动一个 goroutine 接收各组件产生的 events 并广播到每一个 watcher。 ``` func NewBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{ Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull), sleepDuration: defaultSleepDuration, } } ``` 可以看到,kubelet 在初始化完 EventBroadcaster 后会调用 StartRecordingToSink() 和 StartLogging() 两个方法,StartRecordingToSink() 处理函数会将收到的 events 进行缓存、过滤、聚合而后发送到 apiserver,StartLogging() 仅将 events 保存到 kubelet 的日志中。 ## Events的生成 从初始化 EventBroadcaster 的代码中可以看到 kubelet 在初始化完 EventBroadcaster 后紧接着初始化了 EventRecorder,并将已经初始化的 Broadcaster 对象作为参数传给了 EventRecorder,至此,EventBroadcaster、EventRecorder、Broadcaster 三个对象产生了关联。EventRecorder 的主要功能是生成指定格式的 events,以下是相关的定义: ``` func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder { return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}} } type recorderImpl struct { scheme *runtime.Scheme source v1.EventSource *watch.Broadcaster clock clock.Clock } type EventRecorder interface { Event(object runtime.Object, eventtype, reason, message string) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{}) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{}) } ``` EventRecorder是个接口, 接口中包含的几个方法都是产生指定格式的 events,Event() 和 Eventf() 的功能类似 fmt.Println() 和 fmt.Printf(),kubelet 中的各个模块会调用 EventRecorder 生成 events。recorderImpl 是 EventRecorder 实际的对象。EventRecorder 的每个方法会调用 generateEvent,在 generateEvent 中初始化 events 。 以下是生成 events 的函数: ``` unc (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) { ref, err := ref.GetReference(recorder.scheme, object) if err != nil { klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message) return } if !util.ValidateEventType(eventtype) { klog.Errorf("Unsupported event type: '%v'", eventtype) return } event := recorder.makeEvent(ref, annotations, eventtype, reason, message) event.Source = recorder.source go func() { // NOTE: events should be a non-blocking operation defer utilruntime.HandleCrash() //发送事件 recorder.Action(watch.Added, event) }() } func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event { t := metav1.Time{Time: recorder.clock.Now()} namespace := ref.Namespace if namespace == "" { namespace = metav1.NamespaceDefault } return &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()), Namespace: namespace, Annotations: annotations, }, InvolvedObject: *ref, Reason: reason, Message: message, FirstTimestamp: t, LastTimestamp: t, Count: 1, Type: eventtype, } } ``` 初始化完 events 后会调用 recorder.Action() 将 events 发送到 Broadcaster 的事件接收队列中, Action() 是 Broadcaster 中的方法。 以下是 Action() 方法的实现: ``` func (m *Broadcaster) Action(action EventType, obj runtime.Object) { m.incoming <- Event{action, obj} } ``` ## Events的广播 上面已经说了,EventBroadcaster 初始化时会初始化一个 Broadcaster,Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 的实现在 `k8s.io/apimachinery/pkg/watch/mux.go` 中,Broadcaster 初始化完成后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发送过来的 events,Broadcaster 中有一个 map 会保存每一个注册的 watcher, 接着将 events 广播给所有的 watcher,每个 watcher 都有一个接收消息的 channel,watcher 可以通过它的 ResultChan() 方法从 channel 中读取数据进行消费。 以下是 Broadcaster 广播 events 的实现: ``` func (m *Broadcaster) loop() { // Deliberately not catching crashes here. Yes, bring down the process if there's a // bug in watch.Broadcaster. for event := range m.incoming { if event.Type == internalRunFunctionMarker { event.Object.(functionFakeRuntimeObject)() continue } m.distribute(event) } m.closeAll() m.distributing.Done() } // distribute sends event to all watchers. Blocking. func (m *Broadcaster) distribute(event Event) { m.lock.Lock() defer m.lock.Unlock() if m.fullChannelBehavior == DropIfChannelFull { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: default: // Don't block if the event can't be queued. } } } else { for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: } } } } ``` ## Events的处理 那么 watcher 是从何而来呢?每一个要处理 events 的 client 都需要初始化一个 watcher,处理 events 的方法是在 EventBroadcaster 中定义的,以下是 EventBroadcaster 中对 events 处理的三个函数: ``` func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface { watcher := eventBroadcaster.Watch() go func() { defer utilruntime.HandleCrash() for watchEvent := range watcher.ResultChan() { event, ok := watchEvent.Object.(*v1.Event) if !ok { // This is all local, so there's no reason this should // ever happen. continue } eventHandler(event) } }() return watcher } ``` StartEventWatcher() 首先实例化一个 watcher,每个 watcher 都会被塞入到 Broadcaster 的 watcher 列表中,watcher 从 Broadcaster 提供的 channel 中读取 events,然后再调用 eventHandler 进行处理,StartLogging() 和 StartRecordingToSink() 都是对 StartEventWatcher() 的封装,都会传入自己的处理函数。 ``` func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface { return eventBroadcaster.StartEventWatcher( func(e *v1.Event) { logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message) }) } ``` StartLogging() 传入的 eventHandler 仅将 events 保存到日志中。 ``` func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface { eventCorrelator := NewEventCorrelatorWithOptions(eventBroadcaster.options) return eventBroadcaster.StartEventWatcher( func(event *v1.Event) { recordToSink(sink, event, eventCorrelator, eventBroadcaster.sleepDuration) }) } func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) { // Make a copy before modification, because there could be multiple listeners. // Events are safe to copy like this. eventCopy := *event event = &eventCopy result, err := eventCorrelator.EventCorrelate(event) if err != nil { utilruntime.HandleError(err) } if result.Skip { return } tries := 0 for { if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) { break } tries++ if tries >= maxTriesPerEvent { klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event) break } // 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件 if tries == 1 { time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) } else { time.Sleep(sleepDuration) } } } ``` StartRecordingToSink() 方法先根据当前时间生成一个随机数发生器 randGen,增加随机数是为了在重试时增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件,接着实例化一个EventCorrelator,EventCorrelator 会对事件做一些预处理的工作,其中包括过滤、聚合、缓存等操作,具体代码不做详细分析,最后将 recordToSink() 函数作为处理函数,recordToSink() 会将处理后的 events 发送到 apiserver,这是 StartEventWatcher() 的整个工作流程。 ## Events的简单实现 了解完 events 的整个处理流程后,可以参考其实现方式写一个 demo,要实现一个完整的 events 需要包含以下几个功能: - 1、事件的产生 - 2、事件的发送 - 3、事件广播 - 4、事件缓存 - 5、事件过滤和聚合 ``` package main import ( "fmt" "sync" "time" ) const queueLength = int64(1) type Events struct { Reason string Message string Source string Type string Count int64 Timestamp time.Time } type EventBroadcaster interface { Event(etype,reason,message string) StartLogging() Interface Stop() } func NewEventBroadcaster() EventBroadcaster { return &eventBroadcasterImpl{NewBroadcaster(queueLength)} } func (eventBroadcaster *eventBroadcasterImpl) Stop() { eventBroadcaster.Shutdown() } // generate event func (eventBroadcaster *eventBroadcasterImpl) Event(etype, reason, message string) { events := &Events{Type: etype, Reason: reason, Message: message} // send event to broadcast eventBroadcaster.Action(events) } // 仅实现 StartLogging() 的功能,将日志打印 func (eventBroadcaster *eventBroadcasterImpl) StartLogging() Interface { // register a watcher watcher := eventBroadcaster.Watch() go func() { for watchEvent := range watcher.ResultChan() { fmt.Printf("%v\n", watchEvent) } }() go func() { time.Sleep(time.Second * 4) watcher.Stop() }() return watcher } //Broadcaster定义与实现 //接受events channel的长度 const incomingQueueLength = 80 type broadcasterWatcher struct { result chan Events stopped chan struct{} stop sync.Once id int64 m *Broadcaster } // 每个 watcher 通过该方法读取 channel 中广播的 events func (b *broadcasterWatcher) ResultChan() <-chan Events { return b.result } type Broadcaster struct { lock sync.Mutex incoming chan Events watchers map[int64]*broadcasterWatcher watchersQueue int64 watchQueueLength int64 distributing sync.WaitGroup } func NewBroadcaster(queueLength int64) *Broadcaster { m := &Broadcaster{ incoming: make(chan Events, incomingQueueLength), watchers: map[int64]*broadcasterWatcher{}, watchQueueLength: queueLength, } m.distributing.Add(1) // 后台启动一个 goroutine 广播 events go m.loop() return m } // 广播 events 到每个 watcher func (m *Broadcaster) loop() { // 从 incoming channel 中读取所接收到的 events for event := range m.incoming { // 发送 events 到每一个 watcher for _, w := range m.watchers { select { case w.result <- event: case <-w.stopped: default: } } } m.closeAll() m.distributing.Done() } func (m *Broadcaster) Shutdown() { close(m.incoming) m.distributing.Wait() } func (m *Broadcaster) closeAll() { // TODO m.lock.Lock() defer m.lock.Unlock() for _, w := range m.watchers { close(w.result) } m.watchers = map[int64]*broadcasterWatcher{} } func (m *Broadcaster) stopWatching(id int64) { m.lock.Lock() defer m.lock.Unlock() w, ok := m.watchers[id] if !ok { return } delete(m.watchers, id) close(w.result) } // 调用 Watch()方法注册一个 watcher func (m *Broadcaster) Watch() Interface { watcher := &broadcasterWatcher{ result: make(chan Events, incomingQueueLength), stopped: make(chan struct{}), id: m.watchQueueLength, m: m, } m.watchers[m.watchersQueue] = watcher m.watchQueueLength++ return watcher } // watcher 实现 type Interface interface { Stop() ResultChan() <-chan Events } func (b *broadcasterWatcher) Stop() { b.stop.Do(func() { close(b.stopped) b.m.stopWatching(b.id) }) } // Broadcaster 接收所产生的 events func (m *Broadcaster) Action(event *Events) { m.incoming <- *event } type eventBroadcasterImpl struct { *Broadcaster } func main() { eventBroadcast := NewEventBroadcaster() var wg sync.WaitGroup wg.Add(1) // producer event go func() { defer wg.Done() time.Sleep(time.Second) eventBroadcast.Event("add", "test", "1") time.Sleep(time.Second * 2) eventBroadcast.Event("add", "test", "2") time.Sleep(time.Second * 3) eventBroadcast.Event("add", "test", "3") //eventBroadcast.Stop() }() eventBroadcast.StartLogging() wg.Wait() } ``` 此处仅简单实现,将 EventRecorder 处理 events 的功能直接放在了 EventBroadcaster 中实现,对 events 的处理方法仅实现了 StartLogging(),Broadcaster 中的部分功能是直接复制 k8s 中的代码,有一定的精简,其实现值得学习,此处对 EventCorrelator 并没有进行实现。 ## 总结 本文讲述了 k8s 中 events 从产生到展示的一个完整过程,最后也实现了一个简单的 demo,在此将 kubelet 对 events 的整个处理过程再梳理下,其中主要有三个对象 EventBroadcaster、EventRecorder、Broadcaster: - 1、kubelet 首先会初始化 EventBroadcaster 对象,同时会初始化一个 Broadcaster 对象。 - 2、kubelet 通过 EventBroadcaster 对象的 NewRecorder() 方法初始化 EventRecorder 对象,EventRecorder 对象提供的几个方法会生成 events 并通过 Action() 方法发送 events 到 Broadcaster 的 channel 队列中。 - 3、Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 初始化后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发来的 events。 - 4、EventBroadcaster 对 events 有三个处理方法:StartEventWatcher()、StartRecordingToSink()、StartLogging(),StartEventWatcher() 是其中的核心方法,会初始化一个 watcher 注册到 Broadcaster,其余两个处理函数对 StartEventWatcher() 进行了封装,并实现了自己的处理函数。 - 5、 Broadcaster 中有一个 map 会保存每一个注册的 watcher,其会将所有的 events 广播给每一个 watcher,每个 watcher 通过它的 ResultChan() 方法从 channel 接收 events。 - 6、kubelet 会使用 StartRecordingToSink() 和 StartLogging() 对 events 进行处理,StartRecordingToSink() 处理函数收到 events 后会进行缓存、过滤、聚合而后发送到 apiserver,apiserver 会将 events 保存到 etcd 中,使用 kubectl 或其他客户端可以查看。StartLogging() 仅将 events 保存到 kubelet 的日志中。