目录

kubernetes中的事件处理机制

简述

当集群中node或pod异常时,我们可以使用kubectl查看对应的events,实际上再k8s中各个组件会将运行时产生的各种事件汇报到apiserver,对于k8s中的可描述资源,使用kubectl describe 都可以看到其相关的events。基本各个组件(controller-manage、kube-proxy、kube-scheduler、kubelet)都会使用EventRecorder

Events的定义

events 在 k8s.io/api/core/v1/types.go 中进行定义,结构体如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
type Event struct {
	//类型元数据
	metav1.TypeMeta `json:",inline"`
	//标准对象元数据
	metav1.ObjectMeta `json:"metadata" protobuf:"bytes,1,opt,name=metadata"`

	// 该事件所属的对象
	InvolvedObject ObjectReference `json:"involvedObject" protobuf:"bytes,2,opt,name=involvedObject"`

	// 对象当前状态
	Reason string `json:"reason,omitempty" protobuf:"bytes,3,opt,name=reason"`

	//消息描述
	Message string `json:"message,omitempty" protobuf:"bytes,4,opt,name=message"`

	//报告此事件的组件
	Source EventSource `json:"source,omitempty" protobuf:"bytes,5,opt,name=source"`

	//这一事件首次被记录下来的时间。(服务器接收时间为TypeMeta)
	FirstTimestamp metav1.Time `json:"firstTimestamp,omitempty" protobuf:"bytes,6,opt,name=firstTimestamp"`

	//这一事件最近一次发生的记录时间。
	LastTimestamp metav1.Time `json:"lastTimestamp,omitempty" protobuf:"bytes,7,opt,name=lastTimestamp"`

	//此事件发生的次数。
	Count int32 `json:"count,omitempty" protobuf:"varint,8,opt,name=count"`

	//此事件的类型(Normal, Warning),后面的版本可能会添加新的类型
	Type string `json:"type,omitempty" protobuf:"bytes,9,opt,name=type"`

	// 这个事件第一次被观察到的时间。
	EventTime metav1.MicroTime `json:"eventTime,omitempty" protobuf:"bytes,10,opt,name=eventTime"`

	//关于该事件表示的事件系列的数据,如果它是一个单例事件,则为nil。
	Series *EventSeries `json:"series,omitempty" protobuf:"bytes,11,opt,name=series"`

	Action string `json:"action,omitempty" protobuf:"bytes,12,opt,name=action"`

	//用于更复杂操作的可选次要对象。
	Related *ObjectReference `json:"related,omitempty" protobuf:"bytes,13,opt,name=related"`

	//触发此事件的控制器的名称,例如。“kubernetes.io/kubelet”。
	ReportingController string `json:"reportingComponent" protobuf:"bytes,14,opt,name=reportingComponent"`

	//控制器实例的ID。“kubelet-xyzf”。
	ReportingInstance string `json:"reportingInstance" protobuf:"bytes,15,opt,name=reportingInstance"`
}

其中 InvolvedObject 代表和事件关联的对象,source 代表事件源,使用 kubectl 看到的事件一般包含 Type、Reason、Age、From、Message 几个字段。

1
2
3
4
5
Events:
  Type    Reason        Age                    From                  Message
  ----    ------        ----                   ----                  -------
  Normal  NodeReady     19m (x518 over 42d)    kubelet, k8s-node010  Node k8s-node010 status is now: NodeReady
  Normal  NodeNotReady  4m35s (x521 over 42d)  kubelet, k8s-node010  Node k8s-node010 status is now: NodeNotReady

k8s 中 events 目前只有两种类型:”Normal” 和 “Warning”:

1
2
3
4
5
6
const (
	// Information only and will not cause any problems
	EventTypeNormal string = "Normal"
	// These events are to warn that something might go wrong
	EventTypeWarning string = "Warning"
)

EventBroadcaster

events的整个生命周期都与EventBroadcaster有关

EventBroadcaster的初始化

kubelet中对EventBroadcaster的初始化在kubernetes/cmd/kubelet/app/server.go中

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
	...
	// event初始化
	makeEventRecorder(kubeDeps, nodeName)
	...
}

func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
	if kubeDeps.Recorder != nil {
		return
	}
	//初始化EventBroadcaster
	eventBroadcaster := record.NewBroadcaster()
	//初始化 EventRecorder
	kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
	// 记录 events 到本地日志,可以再系统日志/var/log/message上看到
	eventBroadcaster.StartLogging(klog.V(3).Infof)
	if kubeDeps.EventClient != nil {
		klog.V(4).Infof("Sending events to api server.")
		// 上报 events 到 apiserver
		eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
	} else {
		klog.Warning("No api server defined - no events will be sent to API server.")
	}
}

Kubelet 在启动的时候会初始化一个 EventBroadcaster,它主要是对接收到的 events 做一些后续的处理(保存、上报等),EventBroadcaster 也会被 kubelet 中的其他模块使用,以下是相关的定义,对 events 生成和处理的函数都定义在 k8s.io/client-go/tools/record/event.go 中:

k8s.io/client-go/tools/record/event.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
type eventBroadcasterImpl struct {
	*watch.Broadcaster
	sleepDuration time.Duration
	options       CorrelatorOptions
}

type EventBroadcaster interface {

	StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface

	StartRecordingToSink(sink EventSink) watch.Interface

	StartLogging(logf func(format string, args ...interface{})) watch.Interface

	NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder
}

EventBroadcaster 是个接口类型,该接口有以下四个方法:

  • StartEventWatcher() : EventBroadcaster 中的核心方法,接收各模块产生的 events,参数为一个处理 events 的函数,用户可以使用 StartEventWatcher() 接收 events 然后使用自定义的 handle 进行处理

  • StartRecordingToSink() : 调用 StartEventWatcher() 接收 events,并将收到的 events 发送到 apiserver

  • StartLogging() :也是调用 StartEventWatcher() 接收 events,然后保存 events 到日志

  • NewRecorder() :会创建一个指定 EventSource 的 EventRecorder,EventSource 指明了哪个节点的哪个组件

eventBroadcasterImpl 是 eventBroadcaster 实际的对象,初始化 EventBroadcaster 对象的时候会初始化一个 Broadcaster,Broadcaster 会启动一个 goroutine 接收各组件产生的 events 并广播到每一个 watcher。

1
2
3
4
5
6
func NewBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{
		Broadcaster:   watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
		sleepDuration: defaultSleepDuration,
	}
}

可以看到,kubelet 在初始化完 EventBroadcaster 后会调用 StartRecordingToSink() 和 StartLogging() 两个方法,StartRecordingToSink() 处理函数会将收到的 events 进行缓存、过滤、聚合而后发送到 apiserver,StartLogging() 仅将 events 保存到 kubelet 的日志中。

Events的生成

从初始化 EventBroadcaster 的代码中可以看到 kubelet 在初始化完 EventBroadcaster 后紧接着初始化了 EventRecorder,并将已经初始化的 Broadcaster 对象作为参数传给了 EventRecorder,至此,EventBroadcaster、EventRecorder、Broadcaster 三个对象产生了关联。EventRecorder 的主要功能是生成指定格式的 events,以下是相关的定义:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
func (eventBroadcaster *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, source v1.EventSource) EventRecorder {
	return &recorderImpl{scheme, source, eventBroadcaster.Broadcaster, clock.RealClock{}}
}

type recorderImpl struct {
	scheme *runtime.Scheme
	source v1.EventSource
	*watch.Broadcaster
	clock clock.Clock
}

type EventRecorder interface {

	Event(object runtime.Object, eventtype, reason, message string)

	Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{})

	PastEventf(object runtime.Object, timestamp metav1.Time, eventtype, reason, messageFmt string, args ...interface{})

	AnnotatedEventf(object runtime.Object, annotations map[string]string, eventtype, reason, messageFmt string, args ...interface{})
}

EventRecorder是个接口, 接口中包含的几个方法都是产生指定格式的 events,Event() 和 Eventf() 的功能类似 fmt.Println() 和 fmt.Printf(),kubelet 中的各个模块会调用 EventRecorder 生成 events。recorderImpl 是 EventRecorder 实际的对象。EventRecorder 的每个方法会调用 generateEvent,在 generateEvent 中初始化 events 。

以下是生成 events 的函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
unc (recorder *recorderImpl) generateEvent(object runtime.Object, annotations map[string]string, timestamp metav1.Time, eventtype, reason, message string) {
	ref, err := ref.GetReference(recorder.scheme, object)
	if err != nil {
		klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", object, err, eventtype, reason, message)
		return
	}

	if !util.ValidateEventType(eventtype) {
		klog.Errorf("Unsupported event type: '%v'", eventtype)
		return
	}

	event := recorder.makeEvent(ref, annotations, eventtype, reason, message)
	event.Source = recorder.source

	go func() {
		// NOTE: events should be a non-blocking operation
		defer utilruntime.HandleCrash()
		//发送事件
		recorder.Action(watch.Added, event)
	}()
}

func (recorder *recorderImpl) makeEvent(ref *v1.ObjectReference, annotations map[string]string, eventtype, reason, message string) *v1.Event {
	t := metav1.Time{Time: recorder.clock.Now()}
	namespace := ref.Namespace
	if namespace == "" {
		namespace = metav1.NamespaceDefault
	}
	return &v1.Event{
		ObjectMeta: metav1.ObjectMeta{
			Name:        fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
			Namespace:   namespace,
			Annotations: annotations,
		},
		InvolvedObject: *ref,
		Reason:         reason,
		Message:        message,
		FirstTimestamp: t,
		LastTimestamp:  t,
		Count:          1,
		Type:           eventtype,
	}
}

初始化完 events 后会调用 recorder.Action() 将 events 发送到 Broadcaster 的事件接收队列中, Action() 是 Broadcaster 中的方法。

以下是 Action() 方法的实现:

1
2
3
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
   m.incoming <- Event{action, obj}
}

Events的广播

上面已经说了,EventBroadcaster 初始化时会初始化一个 Broadcaster,Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 的实现在 k8s.io/apimachinery/pkg/watch/mux.go 中,Broadcaster 初始化完成后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发送过来的 events,Broadcaster 中有一个 map 会保存每一个注册的 watcher, 接着将 events 广播给所有的 watcher,每个 watcher 都有一个接收消息的 channel,watcher 可以通过它的 ResultChan() 方法从 channel 中读取数据进行消费。

以下是 Broadcaster 广播 events 的实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
func (m *Broadcaster) loop() {
	// Deliberately not catching crashes here. Yes, bring down the process if there's a
	// bug in watch.Broadcaster.
	for event := range m.incoming {
		if event.Type == internalRunFunctionMarker {
			event.Object.(functionFakeRuntimeObject)()
			continue
		}
		m.distribute(event)
	}
	m.closeAll()
	m.distributing.Done()
}

// distribute sends event to all watchers. Blocking.
func (m *Broadcaster) distribute(event Event) {
	m.lock.Lock()
	defer m.lock.Unlock()
	if m.fullChannelBehavior == DropIfChannelFull {
		for _, w := range m.watchers {
			select {
			case w.result <- event:
			case <-w.stopped:
			default: // Don't block if the event can't be queued.
			}
		}
	} else {
		for _, w := range m.watchers {
			select {
			case w.result <- event:
			case <-w.stopped:
			}
		}
	}
}

Events的处理

那么 watcher 是从何而来呢?每一个要处理 events 的 client 都需要初始化一个 watcher,处理 events 的方法是在 EventBroadcaster 中定义的,以下是 EventBroadcaster 中对 events 处理的三个函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
func (eventBroadcaster *eventBroadcasterImpl) StartEventWatcher(eventHandler func(*v1.Event)) watch.Interface {
	watcher := eventBroadcaster.Watch()
	go func() {
		defer utilruntime.HandleCrash()
		for watchEvent := range watcher.ResultChan() {
			event, ok := watchEvent.Object.(*v1.Event)
			if !ok {
				// This is all local, so there's no reason this should
				// ever happen.
				continue
			}
			eventHandler(event)
		}
	}()
	return watcher
}

StartEventWatcher() 首先实例化一个 watcher,每个 watcher 都会被塞入到 Broadcaster 的 watcher 列表中,watcher 从 Broadcaster 提供的 channel 中读取 events,然后再调用 eventHandler 进行处理,StartLogging() 和 StartRecordingToSink() 都是对 StartEventWatcher() 的封装,都会传入自己的处理函数。

1
2
3
4
5
6
func (eventBroadcaster *eventBroadcasterImpl) StartLogging(logf func(format string, args ...interface{})) watch.Interface {
	return eventBroadcaster.StartEventWatcher(
		func(e *v1.Event) {
			logf("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)
		})
}

StartLogging() 传入的 eventHandler 仅将 events 保存到日志中。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (eventBroadcaster *eventBroadcasterImpl) StartRecordingToSink(sink EventSink) watch.Interface {
	eventCorrelator := NewEventCorrelatorWithOptions(eventBroadcaster.options)
	return eventBroadcaster.StartEventWatcher(
		func(event *v1.Event) {
			recordToSink(sink, event, eventCorrelator, eventBroadcaster.sleepDuration)
		})
}

func recordToSink(sink EventSink, event *v1.Event, eventCorrelator *EventCorrelator, sleepDuration time.Duration) {
	// Make a copy before modification, because there could be multiple listeners.
	// Events are safe to copy like this.
	eventCopy := *event
	event = &eventCopy
	result, err := eventCorrelator.EventCorrelate(event)
	if err != nil {
		utilruntime.HandleError(err)
	}
	if result.Skip {
		return
	}
	tries := 0
	for {
		if recordEvent(sink, result.Event, result.Patch, result.Event.Count > 1, eventCorrelator) {
			break
		}
		tries++
		if tries >= maxTriesPerEvent {
			klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
			break
		}
		// 第一次重试增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件
		if tries == 1 {
			time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
		} else {
			time.Sleep(sleepDuration)
		}
	}
}

StartRecordingToSink() 方法先根据当前时间生成一个随机数发生器 randGen,增加随机数是为了在重试时增加随机性,防止 apiserver 重启的时候所有的事件都在同一时间发送事件,接着实例化一个EventCorrelator,EventCorrelator 会对事件做一些预处理的工作,其中包括过滤、聚合、缓存等操作,具体代码不做详细分析,最后将 recordToSink() 函数作为处理函数,recordToSink() 会将处理后的 events 发送到 apiserver,这是 StartEventWatcher() 的整个工作流程。

Events的简单实现

了解完 events 的整个处理流程后,可以参考其实现方式写一个 demo,要实现一个完整的 events 需要包含以下几个功能:

  • 1、事件的产生
  • 2、事件的发送
  • 3、事件广播
  • 4、事件缓存
  • 5、事件过滤和聚合
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
package main

import (
	"fmt"
	"sync"
	"time"
)

const queueLength = int64(1)

type Events struct {
	Reason string
	Message string
	Source string
	Type string
	Count int64
	Timestamp time.Time
}

type EventBroadcaster interface {
	Event(etype,reason,message string)
	StartLogging() Interface
	Stop()
}

func NewEventBroadcaster() EventBroadcaster {
	return &eventBroadcasterImpl{NewBroadcaster(queueLength)}
}

func (eventBroadcaster *eventBroadcasterImpl) Stop() {
	eventBroadcaster.Shutdown()
}

// generate event
func (eventBroadcaster *eventBroadcasterImpl) Event(etype, reason, message string) {
	events := &Events{Type: etype, Reason: reason, Message: message}
	// send event to broadcast
	eventBroadcaster.Action(events)
}

// 仅实现 StartLogging() 的功能,将日志打印
func (eventBroadcaster *eventBroadcasterImpl) StartLogging() Interface {
	// register a watcher
	watcher := eventBroadcaster.Watch()
	go func() {
		for watchEvent := range watcher.ResultChan() {
			fmt.Printf("%v\n", watchEvent)
		}
	}()

	go func() {
		time.Sleep(time.Second * 4)
		watcher.Stop()
	}()

	return watcher
}

//Broadcaster定义与实现
//接受events channel的长度
const  incomingQueueLength = 80

type broadcasterWatcher struct {
	result  chan Events
	stopped chan struct{}
	stop    sync.Once
	id      int64
	m       *Broadcaster
}

// 每个 watcher 通过该方法读取 channel 中广播的 events
func (b *broadcasterWatcher) ResultChan() <-chan Events {
	return b.result
}


type  Broadcaster struct {
	lock sync.Mutex
	incoming chan Events
	watchers map[int64]*broadcasterWatcher
	watchersQueue    int64
	watchQueueLength int64
	distributing     sync.WaitGroup
}

func NewBroadcaster(queueLength int64) *Broadcaster {
	m := &Broadcaster{
		incoming:         make(chan Events, incomingQueueLength),
		watchers:         map[int64]*broadcasterWatcher{},
		watchQueueLength: queueLength,
	}
	m.distributing.Add(1)
	// 后台启动一个 goroutine 广播 events
	go m.loop()
	return m
}

// 广播 events 到每个 watcher
func (m *Broadcaster) loop() {
	// 从 incoming channel 中读取所接收到的 events
	for event := range m.incoming {
		// 发送 events 到每一个 watcher
		for _, w := range m.watchers {
			select {
			case w.result <- event:
			case <-w.stopped:
			default:
			}
		}
	}
	m.closeAll()
	m.distributing.Done()
}

func (m *Broadcaster) Shutdown() {
	close(m.incoming)
	m.distributing.Wait()
}

func (m *Broadcaster) closeAll() {
	// TODO
	m.lock.Lock()
	defer m.lock.Unlock()
	for _, w := range m.watchers {
		close(w.result)
	}
	m.watchers = map[int64]*broadcasterWatcher{}
}

func (m *Broadcaster) stopWatching(id int64) {
	m.lock.Lock()
	defer m.lock.Unlock()
	w, ok := m.watchers[id]
	if !ok {
		return
	}
	delete(m.watchers, id)
	close(w.result)
}

// 调用 Watch()方法注册一个 watcher
func (m *Broadcaster) Watch() Interface {
	watcher := &broadcasterWatcher{
		result:  make(chan Events, incomingQueueLength),
		stopped: make(chan struct{}),
		id:      m.watchQueueLength,
		m:       m,
	}
	m.watchers[m.watchersQueue] = watcher
	m.watchQueueLength++
	return watcher
}

// watcher 实现
type Interface interface {
	Stop()
	ResultChan() <-chan Events
}




func (b *broadcasterWatcher) Stop() {
	b.stop.Do(func() {
		close(b.stopped)
		b.m.stopWatching(b.id)
	})
}


// Broadcaster 接收所产生的 events
func (m *Broadcaster) Action(event *Events) {
	m.incoming <- *event
}


type  eventBroadcasterImpl struct {
	*Broadcaster
}


func main() {
	eventBroadcast := NewEventBroadcaster()

	var wg sync.WaitGroup
	wg.Add(1)
	// producer event
	go func() {
		defer wg.Done()
		time.Sleep(time.Second)
		eventBroadcast.Event("add", "test", "1")
		time.Sleep(time.Second * 2)
		eventBroadcast.Event("add", "test", "2")
		time.Sleep(time.Second * 3)
		eventBroadcast.Event("add", "test", "3")
		//eventBroadcast.Stop()
	}()

	eventBroadcast.StartLogging()
	wg.Wait()
}

此处仅简单实现,将 EventRecorder 处理 events 的功能直接放在了 EventBroadcaster 中实现,对 events 的处理方法仅实现了 StartLogging(),Broadcaster 中的部分功能是直接复制 k8s 中的代码,有一定的精简,其实现值得学习,此处对 EventCorrelator 并没有进行实现。

总结

本文讲述了 k8s 中 events 从产生到展示的一个完整过程,最后也实现了一个简单的 demo,在此将 kubelet 对 events 的整个处理过程再梳理下,其中主要有三个对象 EventBroadcaster、EventRecorder、Broadcaster:

  • 1、kubelet 首先会初始化 EventBroadcaster 对象,同时会初始化一个 Broadcaster 对象。
  • 2、kubelet 通过 EventBroadcaster 对象的 NewRecorder() 方法初始化 EventRecorder 对象,EventRecorder 对象提供的几个方法会生成 events 并通过 Action() 方法发送 events 到 Broadcaster 的 channel 队列中。
  • 3、Broadcaster 的作用就是接收所有的 events 并进行广播,Broadcaster 初始化后会在后台启动一个 goroutine,然后接收所有从 EventRecorder 发来的 events。
  • 4、EventBroadcaster 对 events 有三个处理方法:StartEventWatcher()、StartRecordingToSink()、StartLogging(),StartEventWatcher() 是其中的核心方法,会初始化一个 watcher 注册到 Broadcaster,其余两个处理函数对 StartEventWatcher() 进行了封装,并实现了自己的处理函数。
  • 5、 Broadcaster 中有一个 map 会保存每一个注册的 watcher,其会将所有的 events 广播给每一个 watcher,每个 watcher 通过它的 ResultChan() 方法从 channel 接收 events。
  • 6、kubelet 会使用 StartRecordingToSink() 和 StartLogging() 对 events 进行处理,StartRecordingToSink() 处理函数收到 events 后会进行缓存、过滤、聚合而后发送到 apiserver,apiserver 会将 events 保存到 etcd 中,使用 kubectl 或其他客户端可以查看。StartLogging() 仅将 events 保存到 kubelet 的日志中。