# informer源码分析 ## 结构图 ![image](https://xieys.club/images/posts/116676748-efbbed00-a9d9-11eb-88ff-2b25120b59b3.png) ## 源码分析 当我们需要利用client-go来实现自定义控制器时,通常会使用informerFactory来管理控制器需要的多个资源对象的informer实例 ``` // 创建一个informer factory kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) // factory已经为所有k8s的内置资源对象提供了创建对应informer实例的方法,调用具体informer实例的Lister或Informer方法 // 就完成了将informer注册到factory的过程 deploymentLister := kubeInformerFactory.Apps().V1().Deployments().Lister() // 启动注册到factory的所有informer kubeInformerFactory.Start(stopCh) // 等待首次缓存同步 kubeInformerFactory.WaitForCacheSync(StopCh) ``` ### SharedInformerFactory结构 使用sharedInformerFactory可以统一管理控制器中需要的各资源对象的informer实例,避免同一个资源创建多个实例,这里的informer实现是shareIndexInformer NewSharedInformerFactory调用了NewSharedInformerFactoryWithOptions,将返回一个sharedInformerFactory对象 ``` type sharedInformerFactory struct { client kubernetes.Interface //client namespace string // 关注的namespace,可以通过WithNamespace Option配置 tweakListOptions internalinterfaces.TweakListOptionsFunc lock sync.Mutex defaultResync time.Duration //调用者传过来的时间 customResync map[reflect.Type]time.Duration //自定义rsync时间 informers map[reflect.Type]cache.SharedIndexInformer //针对每种类型资源存储一个informer,informer的类型是ShareIndexInformer startedInformers map[reflect.Type]bool // 每个informer是否都启动了 } ``` - client: clientset,支持直接请求api中各内置资源对象的restful客户端集合 - namespace: factory关注的namespace(默认All Namespace),informer中的reflector将只会listAndWatch指定namespace的资源 - defaultResync: 用于初始化持有的shareIndexInformer的resyncCheckPeriod和defaultEventHandlerResyncPeriod字段,用于定时的将local store同步到deltaFIFO - customResync:支持针对每一个informer来配置resync时间,通过WithCustomResyncConfig这个Option配置,否则就用指定的defaultResync - informers:factory管理的informer集合 - startedInformers:记录已经启动的informer集合 ### sharedInformerFactory对象的关键方法: #### 创建一个sharedInformerFactory ``` // NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options. func NewSharedInformerFactoryWithOptions(client kubernetes.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, //clientset,对原生资源来说,这里可以直接使用kube clientset namespace: v1.NamespaceAll, //可以看到默认是监听所有ns下的指定资源 defaultResync: defaultResync, //30s //以下初始化map结构 informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // 应用所有的option,比如限制名称空间、自定义过滤器之类的配置 for _, opt := range options { factory = opt(factory) } return factory } ``` #### 启动factory下的所有informer ``` // Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { //直接起gorouting调用informer的Run方法,并且标记对应的informer已经启动 go informer.Run(stopCh) f.startedInformers[informerType] = true } } } ``` #### 等待informer的cache被同步 等待每一个ShareIndexInformer的cache被同步,具体怎么算同步完成? - sharedInformerFactory的WaitForCacheSync将会不断调用factory持有的所有informer的HasSynced方法,直到返回true - 而informer的HasSynced方法调用的自己持有的controller的HasSynced方法(informer结构持有controller对象,下文会分析informer的结构) - informer中的controller的HasSynced方法则调用的是controller持有的deltaFIFO对象的HasSynced方法 也就说sharedInformerFactory的WaitForCacheSync方法判断informer的cache是否同步,最终看的是informer中的deltaFIFO是否同步了,deltaFIFO的结构下文将会分析 ``` // WaitForCacheSync waits for all started informers' cache were synced. func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informers := map[reflect.Type]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer } } return informers }() res := map[reflect.Type]bool{} for informType, informer := range informers { res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res } ``` > cache.WaitForCacheSync(stopCh, informer.HasSynced) ``` func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { //wait.PollImmediateUntil,该方法为阻塞方法,直到函数返回true才结束往下继续执行 err := wait.PollImmediateUntil(syncedPollPeriod, func() (bool, error) { for _, syncFunc := range cacheSyncs { if !syncFunc() { return false, nil } } return true, nil }, stopCh) if err != nil { klog.V(2).Infof("stop requested") return false } klog.V(4).Infof("caches populated") return true } ``` #### factory为自己添加informer 只有向factory中添加informer,factory才有意义,添加完成之后,上面factory的start方法就可以启动了 ``` func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() //根据对象类型判断factory中是否已经有对应informer informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } //如果factory中已经有这个对象类型的informer,就不创建了 resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } //没有就根据newFunc创建一个,并存在map中 informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer } 参数 obj: informer关注的资源如deployment{} newFunc: 一个知道如何创建指定informer的方法,k8s为每一个内置的对象都实现了这个方法,比如创建deployment的ShareIndexInformer的方法 ``` #### shareIndexInformer对应的newFunc的实现 client-go中已经为所有内置对象都提供了NewInformerFunc 以deployment为例,通过调用factory.Apps().V1().Deployments()即可为factory添加一个deployment对应的shareIndexInformer的实现,具体过程如下: - 调用factory.Apps().V1().Deployments()即会调用以下Deployments方法创建deploymentInformer对象 ``` // Deployments returns a DeploymentInformer. func (v *version) Deployments() DeploymentInformer { return &deploymentInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } ``` - 只要调用了factory.Apps().V1().Deployments()返回的deploymentInformer的Informer或Lister方法,就完成了向factory中添加deployment informer ``` // deploymentInformer对象具有defaultInformer、Informer、Lister方法 // 可以看到创建deploymentInformer时传递了一个带索引的缓存,附带了一个namespace索引,后面可以了解带索引的缓存实现,比如可以支持查询:某个namespace下的所有pod // 用于创建对应的shareIndexInformer,该方法提供给factory的InformerFor方法 func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } // 向factor中添加dpeloyment的shareIndexInformer并返回 func (f *deploymentInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&appsv1.Deployment{}, f.defaultInformer) } // 返回dpeloyment的lister对象,该lister中持有上面创建出的shareIndexInformer的cache的引用,方便通过缓存获取对象 func (f *deploymentInformer) Lister() v1.DeploymentLister { return v1.NewDeploymentLister(f.Informer().GetIndexer()) } ``` - deploymentInformer的defaultInformer方法将会创建出一个shareIndexInformer ``` // 可先看看下面的shareIndexInformer结构 func NewFilteredDeploymentInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( // 定义对象的ListWatch方法,这里直接用的是clientset中的方法 &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.AppsV1().Deployments(namespace).Watch(context.TODO(), options) }, }, &appsv1.Deployment{}, resyncPeriod, //创建factory是指定的时间,如30s indexers, ) } func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer } ``` ### shareIndexInformer结构 ``` type sharedIndexInformer struct { indexer Indexer //informer中的底层缓存cache controller Controller // 持有reflector和deltaFIFO对象,reflector对象将会listWatch对象添加到deltaFIFO,同时更新indexer cahce,更新成功则通过sharedProcessor触发用户配置的Eventhandler processor *sharedProcessor //持有一系列的listener,每个listener对应用户的EventHandler cacheMutationDetector MutationDetector //可以先忽略,这个对象可以用来监测local cache是否被外部直接修改 listerWatcher ListerWatcher //deployment的listWatch方法 objectType runtime.Object resyncCheckPeriod time.Duration defaultEventHandlerResyncPeriod time.Duration clock clock.Clock started, stopped bool startedLock sync.Mutex blockDeltas sync.Mutex } ``` - indexer:底层缓存,其实就是一个map记录对象,再通过一些其他map在插入删除对象是根据索引函数维护索引key如ns与对象pod的关系 - controller:informer内部的一个controller,这个controller包含reflector:根据用户定义的ListWatch方法获取对象并更新增量队列DeltaFIFO - processor:知道如何处理DeltaFIFO队列中的对象,实现是sharedProcessor{} listerWatcher:知道如何list对象和watch对象的方法 - objectType:对象类型,如deployment{} - resyncCheckPeriod: 给自己的controller的reflector每隔多少s<尝试>调用listener的shouldResync方法 - defaultEventHandlerResyncPeriod:通过AddEventHandler方法给informer配置回调时如果没有配置的默认值,这个值用在processor的listener中判断是否需要进行resync,最小1s 两个字段的默认值都是来自创建factory时指定的defaultResync,当resyncPeriod < s.resyncCheckPeriod时,如果informer已经启动了才添加的EventHandler,那么调整resyncPeriod为resyncCheckPeriod,否则调整resyncCheckPeriod为resyncPeriod ### sharedIndexInformer对象的关键方法: #### sharedIndexInformer的Run方法 前面factory的start方法就是调用了这个Run方法 该方法初始化了controller对象并启动,同时调用processor.run启动所有的listener,用于回调用户配置的EventHandler ``` func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() //创建一个DeltaFIFO,用于shareIndexInformer.controller.reflector //可以看到这里把indexer即本地缓存传入,用来初始化deltaFIFO的knownObject字段 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) //shareIndexInformer中的controller的配置 cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync,// 这个shouldResync方法将被用在reflector ListAndWatch方法中判断定时时间resyncCheckPeriod到了之后该不该进行resync动作 //一个知道如何处理从informer中的controller中的deltaFIFO pop出来的对象的方法 Process: s.HandleDeltas, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() // 这里New一个具体的controller s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() // Separate stop channel because Processor should be stopped strictly after controller processorStopCh := make(chan struct{}) var wg wait.Group defer wg.Wait() // Wait for Processor to stop defer close(processorStopCh) // Tell Processor to stop wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) // 调用processor.run启动所有的listener,回调用户配置的EventHandler wg.StartWithChannel(processorStopCh, s.processor.run) defer func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true // Don't want any new listeners }() // 启动controller s.controller.Run(stopCh) } ``` #### 为shareIndexInformer创建controller 创建Controller的New方法会生成一个controller对象,只初始化controller的config成员,controller的reflector成员是在Run的时候初始化: - 通过执行reflector.Run方法启动reflector,开启对指定对象的listAndWatch过程,获取的对象将添加到reflector的deltaFIFO中 - 通过不断执行processLoop方法,从DeltaFIFO pop出对象,再调用reflector的Process(就是shareIndexInformer的HandleDeltas方法)处理 ``` func New(c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr } //更多字段的配置是再Run的时候进行配置的 func (c *controller) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() go func() { <-stopCh c.config.Queue.Close() }() // 使用config创建一个Reflector r := NewReflector( c.config.ListerWatcher, // 如deployment的listWatch方法 c.config.ObjectType, // deployment{} c.config.Queue, //DeltaFIFO c.config.FullResyncPeriod, //30s ) r.ShouldResync = c.config.ShouldResync //来自sharedProcessor的方法 r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() // 启动reflector,执行ListWatch方法 wg.StartWithChannel(stopCh, r.Run) // 不断执行processLoop,这个方法其实就是从DeltaFIFO pop出对象,再调用reflector的Process(其实是shareIndexInformer的HandleDeltas方法)处理 wait.Until(c.processLoop, time.Second, stopCh) } ``` > r.Run ``` func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.backoffManager, true, stopCh) klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) } ``` #### controller的processLoop方法 不断执行processLoop,这个方法其实就是从DeltaFIFO pop出对象,再调用reflector的Process(其实是shareIndexInformer的HandleDeltas方法)处理 ``` func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } } ``` #### deltaFIFO pop出来的对象处理逻辑 先看看controller怎么处理DeltaFIFO中的对象,需要注意DeltaFIFO中的Deltas的结构,是一个slice,保存同一个对象的所有增量事件 ![image](https://www.xieys.club/images/posts/116666059-19224c00-a9cd-11eb-945c-955cce9eacd1.png) sharedIndexInformer的HandleDeltas处理从deltaFIFO pod出来的增量时,先尝试更新到本地缓存cache,更新成功的话会调用processor.distribute方法向processor中的listener添加notification,listener启动之后会不断获取notification回调用户的EventHandler方法 - Sync: reflector list到对象时Replace到deltaFIFO时daltaType为Sync或者resync把localstrore中的对象加回到deltaFIFO - Added、Updated: reflector watch到对象时根据watch event type是Add还是Modify对应deltaType为Added或者Updated - Deleted: reflector watch到对象的watch event type是Delete或者re-list Replace到deltaFIFO时local store多出的对象以Delete的方式加入deltaFIFO ``` func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) // 对象先通过shareIndexInformer中的indexer更新到缓存 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // Sync events are only propagated to listeners that requested resync isSync = true case d.Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Replaced events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } // 如果informer的本地缓存更新成功,那么就调用shareProcess分发对象给用户自定义controller处理 // 可以看到,对EventHandler来说,本地缓存已经存在该对象就认为是update s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } ``` 前面描述了shareIndexInformer内部如何从deltaFIFO取出对象更新缓存并通过processor回调用户的EventHandler,那deltaFIFO中的增量事件是怎么加进入的呢?先看看shareIndexInformer中controller中的reflector实现 #### r.run发起ListWatch reflector.run将会调用指定资源的ListAndWatch方法,注意这里什么时候可能发生re-list或者re-watch:因为是通过wait.BackoffUntil不断调用ListAndWatch方法,根据 Backoff 的定时器来循环触发 `ListAndWatch`,直到 `stopCh` 关闭 - 以ResourceVersion=0开始首次的List操作获取指定资源的全量对象,并通过reflector的syncWith方法将所有对象批量插入deltaFIFO - List完成之后将会更新ResourceVersion用户Watch操作,通过reflector的watchHandler方法把watch到的增量对象加入到deltaFIFO ``` func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name) var resourceVersion string //以版本号ResourceVersion,以lastSyncResourceVersion为准,如果没有的话就返回0,开始首次list options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} if err := func() error { initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name}) defer initTrace.LogIfLong(10 * time.Second) var list runtime.Object var paginatedResult bool var err error listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) switch { case r.WatchListPageSize != 0: pager.PageSize = r.WatchListPageSize case r.paginatedResult: case options.ResourceVersion != "" && options.ResourceVersion != "0": pager.PageSize = 0 } // 获取list的结果 list, paginatedResult, err = pager.List(context.Background(), options) if isExpiredError(err) { r.setIsLastSyncResourceVersionExpired(true) list, paginatedResult, err = pager.List(context.Background(), metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}) } close(listCh) }() select { case <-stopCh: return nil case r := <-panicCh: panic(r) case <-listCh: } if err != nil { return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err) } if options.ResourceVersion == "0" && paginatedResult { r.paginatedResult = true } r.setIsLastSyncResourceVersionExpired(false) // list was successful initTrace.Step("Objects listed") listMetaInterface, err := meta.ListAccessor(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err) } // 根据结果更新版本号,用于接下来的watch resourceVersion = listMetaInterface.GetResourceVersion() initTrace.Step("Resource version extracted") items, err := meta.ExtractList(list) if err != nil { return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err) } initTrace.Step("Objects extracted") // 这里的syncWith是把首次list到的结果通过DeltaFIFO的Replce方法批量添加到队列 // 队列提供了Resync方法用于判断Replace批量插入的对象是否都pop出去了,factory/informer的WaitForCacheSync就是调用了DeltaFIFO的的Resync方法 if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err) } initTrace.Step("SyncWith done") r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err } resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }() // 以list对象中获取的ResourceVersion不断watch for { select { case <-stopCh: return nil default: } timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options = metav1.ListOptions{ ResourceVersion: resourceVersion, TimeoutSeconds: &timeoutSeconds, AllowWatchBookmarks: true, } w, err := r.listerWatcher.Watch(options) if err != nil { switch { case isExpiredError(err): klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) case err == io.EOF: case err == io.ErrUnexpectedEOF: klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err) default: utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err)) } if utilnet.IsConnectionRefused(err) { time.Sleep(time.Second) continue } return nil } // watchhandler处理watch到的数据,即把对象根据watch.type增加到DeltaFIFO中 if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { if err != errorStopRequested { switch { case isExpiredError(err): klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err) default: klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err) } } return nil } } } ``` #### list出的对象批量插入deltaFIFO 可以看到是syncWith方法是通过调用deltaFIFO的Replace实现批量插入,具体实现见下文中deltaFIFO的实现描述 ``` func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error { found := make([]interface{}, 0, len(items)) for _, item := range items { found = append(found, item) } return r.store.Replace(found, resourceVersion) } ``` #### watch出的增量对象插入到deltaFIFO watch到的对象直接根据watch到的事件类型eventType更新store(即deltaFIFO),注意这个event是api直接返回的,watch event type可能是Added、Modifyed、Deleted ``` // watchHandler watches w and keeps *resourceVersion up to date. func (r *Reflector) watchHandler(w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { start := r.clock.Now() eventCount := 0 defer w.Stop() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } if event.Type == watch.Error { return apierrors.FromObject(event.Object) } if r.expectedType != nil { if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } if r.expectedGVK != nil { if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil } ``` #### 定时触发resync 在ListAndWatch中还起了一个gorouting定时的进行resync动作 ``` resyncerrc := make(chan error, 1) cancelCh := make(chan struct{}) defer close(cancelCh) go func() { //获取一个定时channel,定时的时间是创建informer factory时传入的resyncPeriod resyncCh, cleanup := r.resyncChan() defer func() { cleanup() // Call the last one written into cleanup }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { klog.V(4).Infof("%s: forcing resync", r.name) if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }() ``` 调用deltaFIFO的Resync方法,把底层缓存的对象全部重新添加到deltaFIFO中 ``` func (f *DeltaFIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() if f.knownObjects == nil { return nil } keys := f.knownObjects.ListKeys() for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err } } return nil } ``` 需要注意的是,在添加对象到deltaFIFO时会检查该队列中有没有增量没有处理完的,如果有则忽略这个对象的此次resync ``` func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) return nil } else if !exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) return nil } id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 如果deltaFIFO中该对象还有增量没有处理,则忽略以避免冲突,原因如上面注释:在同一个对象的增量列表中,排在后面的增量的object相比前面的增量应该更新才是合理的 if len(f.items[id]) > 0 { return nil } // 跟deltaFIFO的Replace方法一样,都是添加一个Sync类型的增量 if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil } ``` ### 底层缓存的实现 shareIndexInformer中带有一个缓存indexer,是一个支持索引的map,优点是支持快速查询: - Indexer、Queue接口和cache结构体都实现了顶层的Store接口 - cache结构体持有threadSafeStore对象,threadSafeStore是线程安全的,并且具备自定义索引查找的能力 threadSafeMap的结构如下 ``` // threadSafeMap implements ThreadSafeStore type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices } ``` - items:存储具体的对象,比如key为ns/podName,value为pod{} - Indexers:一个map[string]IndexFunc结构,其中key为索引的名称,如’namespace’字符串,value则是一个具体的索引函数 - Indices:一个map[string]Index结构,其中key也是索引的名称,value是一个map[string]sets.String结构,其中key是具体的namespace,如default这个ns,vlaue则是这个ns下的按照索引函数求出来的值的集合,比如default这个ns下的所有pod对象名称 #### 索引的维护 通过在向items插入对象的过程中,遍历所有的Indexers中的索引函数,根据索引函数存储索引key到value的集合关系,以下图式结构可以很好的说明: ![image](https://xieys.club/images/posts/116666278-5981ca00-a9cd-11eb-9570-8ee6eb447d05.png) #### 缓存中增加对象 在向threadSafeMap的items map中增加完对象后,再通过updateIndices更新索引结构 ``` func (c *threadSafeMap) Add(key string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() oldObject := c.items[key] c.items[key] = obj c.updateIndices(oldObject, obj, key) } func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { // if we got an old object, we need to remove it before we add it again if oldObj != nil { // 这是一个更新操作,先删除原对象的索引记录 c.deleteFromIndices(oldObj, key) } // 枚举所有添加的索引函数 for name, indexFunc := range c.indexers { //根据索引函数计算obj对应的 indexValues, err := indexFunc(newObj) if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } index := c.indices[name] if index == nil { index = Index{} c.indices[name] = index } //索引函数计算出多个value,也可能是一个,比如pod的ns就只有一个值,pod的label可能就有多个值 for _, indexValue := range indexValues { //比如namespace索引,根据indexValue=default,获取default对应的集合再把当前对象插入 set := index[indexValue] if set == nil { set = sets.String{} index[indexValue] = set } set.Insert(key) } } } ``` #### IndexFunc索引函数 一个典型的索引函数MetaNamespaceIndexFunc,方便查询时可以根据namespace获取该namespace下的所有对象 ``` func (f *deploymentInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredDeploymentInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } // MetaNamespaceIndexFunc is a default index function that indexes based on an object's namespace func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) { meta, err := meta.Accessor(obj) if err != nil { return []string{""}, fmt.Errorf("object has no meta: %v", err) } return []string{meta.GetNamespace()}, nil } ``` #### Index方法利用索引查找对象 提供利用索引来查询的能力,Index方法可以根据索引名称和对象,查询所有的关联对象 例如通过 `Index(“namespace”, &metav1.ObjectMeta{Namespace: namespace})`获取指定ns下的所有对象,具体可以参考tools/cache/listers.go#ListAllByNamespace ``` func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) { c.lock.RLock() defer c.lock.RUnlock() indexFunc := c.indexers[indexName] if indexFunc == nil { return nil, fmt.Errorf("Index with name %s does not exist", indexName) } indexedValues, err := indexFunc(obj) if err != nil { return nil, err } index := c.indices[indexName] var storeKeySet sets.String //例如namespace索引 if len(indexedValues) == 1 { // In majority of cases, there is exactly one value matching. // Optimize the most common path - deduping is not needed here. storeKeySet = index[indexedValues[0]] } else { //例如lable索引 // Need to de-dupe the return list. // Since multiple keys are allowed, this can happen. storeKeySet = sets.String{} for _, indexedValue := range indexedValues { for key := range index[indexedValue] { storeKeySet.Insert(key) } } } list := make([]interface{}, 0, storeKeySet.Len()) for storeKey := range storeKeySet { list = append(list, c.items[storeKey]) } return list, nil } ``` ### deltaFIFO实现 shareIndexInformer.controller.reflector中的deltaFIFO实现 ``` // NewDeltaFIFO方法在前面分析的sharedIndexInformer的Run方法中调用 // fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO { if opts.KeyFunction == nil { opts.KeyFunction = MetaNamespaceKeyFunc } f := &DeltaFIFO{ items: map[string]Deltas{}, queue: []string{}, keyFunc: opts.KeyFunction, knownObjects: opts.KnownObjects, emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced, } f.cond.L = &f.lock return f } type DeltaFIFO struct { lock sync.RWMutex cond sync.Cond //这里的Deltas是[]Delta类型 items map[string]Deltas queue []string populated bool initialPopulationCount int keyFunc KeyFunc // 这个其实就是shareIndexInformer中的indexer底层缓存的引用 knownObjects KeyListerGetter closed bool closedLock sync.Mutex emitDeltaTypeReplaced bool } type Delta struct { Type DeltaType Object interface{} } type Deltas []Delta ``` - items:记录deltaFIFO中的对象,注意map的value是一个delta slice - queue:记录上面items中的key,维护对象的fifo顺序 - populated:队列中是否填充过数据,LIST时调用Replace或调用Delete/Add/Update都会置为true - initialPopulationCount:首次List的时候获取到的数据就会调用Replace批量增加到队列,同时设置 - initialPopulationCount为List到的对象数量,每次Pop出来会减一,用于判断是否把首次批量插入的数据都POP出去了 - keyFunc:知道怎么从对象中解析出对应key的函数,如MetaNamespaceKeyFunc可以解析出namespace/name的形式 - knownObjects:这个其实就是shareIndexInformer中的indexer底层缓存的引用,可以认为和etcd中的数据一致 #### DeltaFIFO关键的方法 ##### 向deltaFIFO批量插入对象 批量向队列插入数据的方法,注意knownObjects是informer中本地缓存indexer的引用 这里会更新deltaFIFO的initialPopulationCount为Replace list的对象总数加上list中相比knownObjects多出的对象数量。 因为Replace方法可能是reflector发生re-list的时候再次调用,这个时候就会出现knownObjects中存在的对象不在Replace list的情况(比如watch的delete事件丢失了),这个时候是把这些对象筛选出来,封装成DeletedFinalStateUnknown对象以Delete type类型再次加入到deltaFIFO中,这样最终从detaFIFO处理这个DeletedFinalStateUnknown 增量时就可以更新本地缓存并且触发reconcile。 因为这个对象最终的结构确实找不到了,所以只能用knownObjects里面的记录来封装delta,所以叫做FinalStateUnknown。 ``` func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { f.lock.Lock() defer f.lock.Unlock() keys := make(sets.String, len(list)) // keep backwards compat for old clients action := Sync if f.emitDeltaTypeReplaced { action = Replaced } for _, item := range list { key, err := f.KeyOf(item) if err != nil { return KeyError{item, err} } keys.Insert(key) // 调用deltaFIFO的queueActionLocked向deltaFIFO增加一个增量 // 可以看到Replace添加的Delta type都是Sync if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } // 底层的缓存不应该会是nil,可以忽略这种情况 if f.knownObjects == nil { // Do deletion detection against our own list. queuedDeletions := 0 for k, oldItem := range f.items { if keys.Has(k) { continue } // 当knownObjects为空时,如果item中存在对象不在新来的list中,那么该对象被认为要被删除 var deletedObj interface{} if n := oldItem.Newest(); n != nil { deletedObj = n.Object } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } if !f.populated { f.populated = true // While there shouldn't be any queued deletions in the initial // population of the queue, it's better to be on the safe side. f.initialPopulationCount = len(list) + queuedDeletions } return nil } // Detect deletions not already in the queue. // 当reflector发生re-list时,可能会出现knownObjects中存在的对象不在Replace list的情况 knownKeys := f.knownObjects.ListKeys() // 记录这次替换相当于在缓存中删除多少对象 queuedDeletions := 0 // 枚举local store中的所有对象 for _, k := range knownKeys { // 对象也在Replace list中,所以跳过 if keys.Has(k) { continue } // 对象在缓存,但不在list中,说明替换操作完成后,这个对象相当于被删除了 // 注意这里的所谓替换,对deltaFIFO来说,是给队列中的对应对象增加一个 // delete增量queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}) // 真正删除缓存需要等到DeletedFinalStateUnknown增量被POP出来操作local store时 deletedObj, exists, err := f.knownObjects.GetByKey(k) if err != nil { deletedObj = nil klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k) } else if !exists { deletedObj = nil klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k) } queuedDeletions++ if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil { return err } } // 设置f.initialPopulationCount,该值大于0表示首次插入的对象还没有全部pop出去 // informer WaitForCacheSync就是在等待该值为0 if !f.populated { f.populated = true f.initialPopulationCount = len(list) + queuedDeletions } return nil } ``` ##### 从deltaFIFO pop出对象 从队列中Pop出一个方法,并由函数process来处理,其实就是shareIndexInformer的HandleDeltas 每次从DeltaFIFO Pop出一个对象,f.initialPopulationCount会减一,初始值为List时的对象数量 前面的Informer的WaitForCacheSync最终就是调用了这个HasSynced方法 ``` func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { for len(f.queue) == 0 { if f.IsClosed() { return nil, ErrFIFOClosed } f.cond.Wait() } //取出队首元素 id := f.queue[0] //去掉队首元素 f.queue = f.queue[1:] //如果initialPopulationCount大于0,就减1 if f.initialPopulationCount > 0 { f.initialPopulationCount-- } item, ok := f.items[id] if !ok { // Item may have been deleted subsequently. continue } delete(f.items, id) //处理增量对象 err := process(item) // 如果没有处理成功,那么就会重新加到deltaFIFO队列中 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) err = e.Err } // Don't need to copyDeltas here, because we're transferring // ownership to the caller. return item, err } } ``` ##### deltaFIFO是否同步完成 串连前面的问题:factory的WaitForCacheSync是如何等待缓存同步完成 factory的WaitForCacheSync方法调用informer的HasSync方法,继而调用deltaFIFO的HasSync方法,也就是判断从reflector list到的数据是否pop完 ``` func (f *DeltaFIFO) HasSynced() bool { f.lock.Lock() defer f.lock.Unlock() return f.populated && f.initialPopulationCount == 0 } ``` ##### 同步local store到deltaFIFO 所谓的resync,其实就是把knownObjects即缓存中的对象全部再通过queueActionLocked(Sync, obj)加到队列 ``` func (f *DeltaFIFO) Resync() error { f.lock.Lock() defer f.lock.Unlock() if f.knownObjects == nil { return nil } keys := f.knownObjects.ListKeys() // 把local store中的对象都以Sync类型增量的形式重新放回到deltaFIFO for _, k := range keys { if err := f.syncKeyLocked(k); err != nil { return err } } return nil } func (f *DeltaFIFO) syncKeyLocked(key string) error { obj, exists, err := f.knownObjects.GetByKey(key) if err != nil { klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key) return nil } else if !exists { klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key) return nil } // If we are doing Resync() and there is already an event queued for that object, // we ignore the Resync for it. This is to avoid the race, in which the resync // comes with the previous value of object (since queueing an event for the object // doesn't trigger changing the underlying store . id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 如上述注释,在resync时,如果deltaFIFO中该对象还存在其他delta没处理,那么忽略这次的resync // 因为调用queueActionLocked是增加delta是通过append的,且处理对象的增量delta时,是从oldest到newdest的 // 所以如果某个对象还存在增量没处理,再append就可能导致后处理的delta是旧的对象 if len(f.items[id]) > 0 { return nil } // 可以看到这里跟list一样,增加到deltaFIFO的是一个Sync类型的增量 if err := f.queueActionLocked(Sync, obj); err != nil { return fmt.Errorf("couldn't queue object: %v", err) } return nil } ``` ##### 在deltaFIFO增加一个对象 注意这里在append增量时的去重逻辑:如果连续的两个增量类型都是Deleted,那么就去掉一个(正常情况确实不会出现这样,且没必要),优先去掉前面所说的因为re-list可能导致的api与local store不一致而增加的DeletedFinalStateUnknown类型的增量 ``` //在队列中给指定的对象append一个Delta func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { id, err := f.KeyOf(obj) if err != nil { return KeyError{obj, err} } // 把增量append到slice的后面 newDeltas := append(f.items[id], Delta{actionType, obj}) // 连续的两个Deleted delta将会去掉一个 newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { // 维护queue队列 if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() } else { delete(f.items, id) } return nil } ``` 当前认为只有连续的两个Delete delta才有必要去重 ``` func dedupDeltas(deltas Deltas) Deltas { n := len(deltas) if n < 2 { return deltas } // 每次取最后两个delta来判断 a := &deltas[n-1] b := &deltas[n-2] if out := isDup(a, b); out != nil { d := append(Deltas{}, deltas[:n-2]...) return append(d, *out) } return deltas } func isDup(a, b *Delta) *Delta { // 当前认为只有连续的两个Delete delta才有必要去重 if out := isDeletionDup(a, b); out != nil { return out } // TODO: Detect other duplicate situations? Are there any? return nil } func isDeletionDup(a, b *Delta) *Delta { if b.Type != Deleted || a.Type != Deleted { return nil } // Do more sophisticated checks, or is this sufficient? // 优先去重DeletedFinalStateUnknown类型的Deleted delta if _, ok := b.Object.(DeletedFinalStateUnknown); ok { return a } return b } ``` ### sharedProcessor的实现 shareIndexInformer中的sharedProcess结构,用于分发deltaFIFO的对象,回调用户配置的EventHandler方法 可以看到shareIndexInformer中的process直接通过&sharedProcessor{clock: realClock}初始化 ``` func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ // 初始化一个默认的processor processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, // cacheMutationDetector:可以记录local store是否被外部修改 cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer } ``` 如下为sharedProcessor结构: ``` type sharedProcessor struct { listenersStarted bool listenersLock sync.RWMutex listeners []*processorListener syncingListeners []*processorListener clock clock.Clock wg wait.Group } ``` - listenersStarted:listeners中包含的listener是否都已经启动了 - listeners:已添加的listener列表,用来处理watch到的数据 - syncingListeners:已添加的listener列表,用来处理list到的数据 #### 理解listeners和syncingListeners的区别 processor可以支持listener的维度配置是否需要resync:一个informer可以配置多个EventHandler,而一个EventHandler对应processor中的一个listener,每个listener可以配置需不需要resync,如果某个listener需要resync,那么添加到deltaFIFO的Sync增量最终也只会回到对应的listener reflector中会定时判断每一个listener是否需要进行resync,判断的依据是看配置EventHandler的时候指定的resyncPeriod,0代表该listener不需要resync,否则就每隔resyncPeriod看看是否到时间了 - listeners:记录了informer添加的所有listener - syncingListeners:记录了informer中哪些listener处于sync状态 syncingListeners是listeners的子集,syncingListeners记录那些开启了resync且时间已经到达了的listener,把它们放在一个独立的slice是避免下面分析的distribute方法中把obj增加到了还不需要resync的listener中 #### 为sharedProcessor添加listener 在sharedProcessor中添加一个listener ``` func (p *sharedProcessor) addListenerLocked(listener *processorListener) { // 同时添加到listeners和syncingListeners列表,但其实添加的是同一个对象的引用 // 所以下面run启动的时候只需要启动listeners中listener就可以了 p.listeners = append(p.listeners, listener) p.syncingListeners = append(p.syncingListeners, listener) } ``` #### 启动sharedProcessor中的listener sharedProcessor启动所有的listener 是通过调用listener.run和listener.pop来启动一个listener,两个方法具体作用看下文processorListener说明 ``` func (p *sharedProcessor) run(stopCh <-chan struct{}) { func() { p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { // listener的run方法不断的从listener自身的缓冲区取出对象回调handler p.wg.Start(listener.run) // listener的pop方法不断的接收对象并暂存在自身的缓冲区中 p.wg.Start(listener.pop) } p.listenersStarted = true }() <-stopCh p.listenersLock.RLock() defer p.listenersLock.RUnlock() for _, listener := range p.listeners { close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop } p.wg.Wait() // Wait for all .pop() and .run() to stop } ``` #### sharedProcessor分发对象 distribute方法是在前面介绍`[deltaFIFO pop出来的对象处理逻辑]`时提到的,把notification事件添加到listener中,listener如何pop出notification回调EventHandler见下文listener部分分析 当通过distribute分发从deltaFIFO获取的对象时,如果delta type是Sync,那么就会把对象交给sync listener来处理,而Sync类型的delta只能来源于下面两种情况: - reflector list Replace到deltaFIFO的对象:因为首次在sharedProcessor增加一个listener的时候是同时加在listeners和syncingListeners中的 - reflector定时触发resync local store到deltaFIFO的对象:因为每次reflector调用processor的shouldResync时,都会把达到resync条件的listener筛选出来重新放到p.syncingListeners 上面两种情况都可以在p.syncingListeners中准备好listener ``` func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() // 如果是通过reflector list Replace到deltaFIFO的对象或者reflector定时触发resync到deltaFIFO的对象,那么distribute到syncingListeners if sync { // 保证deltaFIFO Resync方法过来的delta obj只给开启了resync能力的listener for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } ``` #### processorListener结构 sharedProcessor中的listener具体的类型:运转逻辑就是把用户通过addCh增加的事件发送到nextCh供run方法取出回调Eventhandler,因为addCh和nectCh都是无缓冲channel,所以中间引入ringBuffer做缓存 processorListener是sharedIndexInformer调用AddEventHandler时创建并添加到sharedProcessor,对于一个Informer,可以多次调用AddEventHandler来添加多个listener ``` type processorListener struct { nextCh chan interface{} addCh chan interface{} handler ResourceEventHandler // pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed. // There is one per listener, but a failing/stalled listener will have infinite pendingNotifications // added until we OOM. // TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but // we should try to do something better. pendingNotifications buffer.RingGrowing // requestedResyncPeriod is how frequently the listener wants a // full resync from the shared informer, but modified by two // adjustments. One is imposing a lower bound, // `minimumResyncPeriod`. The other is another lower bound, the // sharedProcessor's `resyncCheckPeriod`, that is imposed (a) only // in AddEventHandlerWithResyncPeriod invocations made after the // sharedProcessor starts and (b) only if the informer does // resyncs at all. requestedResyncPeriod time.Duration // resyncPeriod is the threshold that will be used in the logic // for this listener. This value differs from // requestedResyncPeriod only when the sharedIndexInformer does // not do resyncs, in which case the value here is zero. The // actual time between resyncs depends on when the // sharedProcessor's `shouldResync` function is invoked and when // the sharedIndexInformer processes `Sync` type Delta objects. resyncPeriod time.Duration // nextResync is the earliest time the listener should get a full resync nextResync time.Time // resyncLock guards access to resyncPeriod and nextResync resyncLock sync.Mutex } ``` - addCh:无缓冲的chan,listener的pop方法不断从addCh取出对象丢给nextCh。addCh中的对象来源于listener的add方法,如果nextCh不能及时消费,则放入缓冲区pendingNotifications - nextCh:无缓冲的chan,listener的run方法不断从nextCh取出对象回调用户handler。nextCh的对象来源于addCh或者缓冲区 - pendingNotifications:一个无容量限制的环形缓冲区,可以理解为可以无限存储的队列,用来存储deltaFIFO分发过来的消息 - nextResync:由resyncPeriod和requestedResyncPeriod计算得出,与当前时间now比较判断listener是否该进行resync了 - resyncPeriod:listener自身期待多长时间进行resync - requestedResyncPeriod:informer希望listener多长时间进行resync #### 在listener中添加事件 shareProcessor中的distribute方法调用的是listener的add来向addCh增加消息,注意addCh是无缓冲的chan,依赖pop不断从addCh取出数据 ``` func (p *processorListener) add(notification interface{}) { // 虽然p.addCh是一个无缓冲的channel,但是因为listener中存在pendingNotifications(ring buffer),所以这里并不会一直阻塞 p.addCh <- notification } ``` #### 判断是否需要resync 如果resyncPeriod为0表示不需要resync,否则判断当前时间now是否已经超过了nextResync,是的话则返回true表示需要resync。其中nextResync在每次调用listener的shouldResync方法成功时更新 ``` func (p *sharedProcessor) shouldResync() bool { p.listenersLock.Lock() defer p.listenersLock.Unlock() // 这里每次都会先置空列表,保证里面记录了当前需要resync的listener p.syncingListeners = []*processorListener{} resyncNeeded := false now := p.clock.Now() for _, listener := range p.listeners { // need to loop through all the listeners to see if they need to resync so we can prepare any // listeners that are going to be resyncing. if listener.shouldResync(now) { resyncNeeded = true //// 达到resync条件的listener被加入syncingListeners p.syncingListeners = append(p.syncingListeners, listener) listener.determineNextResync(now) } } return resyncNeeded } ``` #### listener的run方法回调EventHandler listener的run方法不断的从nextCh中获取notification,并根据notification的类型来调用用户自定的EventHandler ``` func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: // 回调用户配置的handler p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) } ``` #### addCh到nextCh的对象传递 listener中pop方法的逻辑相对比较绕,最终目的就是把分发到addCh的数据从nextCh或者pendingNotifications取出来 notification变量记录下一次要被放到p.nextCh供pop方法取出的对象,开始seletct时必然只有case2可能ready Case2做的事可以描述为:从p.addCh获取对象,如果临时变量notification还是nil,说明需要往notification赋值,供case1推送到p.nextCh 如果notification已经有值了,那个当前从p.addCh取出的值要先放到环形缓冲区中 Case1做的事可以描述为:看看能不能把临时变量notification推送到nextCh(nil chan会阻塞在读写操作上),可以写的话,说明这个nextCh是p.nextCh,写成功之后,需要从缓存中取出一个对象放到notification为下次执行这个case做准备,如果缓存是空的,通过把nextCh chan设置为nil来禁用case1,以便case2位notification赋值 ``` func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop //nextCh没有利用make初始化,将阻塞在读和写上 var nextCh chan<- interface{} //notification初始值为nil var notification interface{} for { select { // 执行这个case,相当于给p.nextCh添加来自p.addCh的内容 case nextCh <- notification: // Notification dispatched var ok bool //前面的notification已经加到p.nextCh了, 为下一次这个case再次ready做准备 notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } //第一次select只有走这个case case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications //为notification赋值 notification = notificationToAdd //唤醒第一个case nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched //select没有命中第一个case,那么notification就没有被消耗,那么把从p.addCh获取的对象加到缓存中 p.pendingNotifications.WriteOne(notificationToAdd) } } } } ```