目录

kubernetes中informer机制

informer的机制

cient-go 是从 k8s 代码中抽出来的一个客户端工具,Informer 是 client-go 中的核心工具包,已经被 kubernetes 中众多组件所使用。所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client,本地缓存被称为 Store,索引被称为 Index。使用 informer 的目的是为了减轻 apiserver 数据交互的压力而抽象出来的一个 cache 层, 客户端对 apiserver 数据的 “读取” 和 “监听” 操作都通过本地 informer 进行。Informer 实例的Lister()方法可以直接查找缓存在本地内存中的数据。

informer的主要功能

  • 同步数据到本地缓存
  • 根据对应的事件类型,触发事先注册好的ResourceEventHandler

informer的组件及其作用

Informer 中主要有 Reflector、Delta FIFO Queue、Local Store、WorkQueue 几个组件。以下是 Informer 的工作流程图。

https://www.xieys.club/images/posts/informer-1.png https://www.xieys.club/images/posts/informer-1.png

  • Reflector:称之为反射器,实现对 apiserver 指定类型对象的监控(ListAndWatch),其中反射实现的就是把监控的结果实例化成具体的对象,最终也是调用 Kubernetes 的 List/Watch API;
  • DeltaIFIFO Queue:一个增量队列,将 Reflector 监控变化的对象形成一个 FIFO 队列,此处的 Delta 就是变化;
  • LocalStore:就是 informer 的 cache,这里面缓存的是 apiserver 中的对象(其中有一部分可能还在DeltaFIFO 中),此时使用者在查询对象的时候就直接从 cache 中查找,减少了 apiserver 的压力,LocalStore 只会被 Lister 的 List/Get 方法访问。
  • WorkQueue:DeltaIFIFO 收到事件后会先将事件存储在自己的数据结构中,然后直接操作 Store 中存储的数据,更新完 store 后 DeltaIFIFO 会将该事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件会根据对应的类型触发对应的回调函数。

informer的工作流程

  • Informer 首先会 list/watch apiserver,Informer 所使用的 Reflector 包负责与 apiserver 建立连接,Reflector 使用 ListAndWatch 的方法,会先从 apiserver 中 list 该资源的所有实例,list 会拿到该对象最新的 resourceVersion,然后使用 watch 方法监听该 resourceVersion 之后的所有变化,若中途出现异常,reflector 则会从断开的 resourceVersion 处重现尝试监听所有变化,一旦该对象的实例有创建、删除、更新动作,Reflector 都会收到”事件通知”,这时,该事件及它对应的 API 对象这个组合,被称为增量(Delta),它会被放进 DeltaFIFO 中。
  • Informer 会不断地从这个 DeltaFIFO 中读取增量,每拿出一个对象,Informer 就会判断这个增量的时间类型,然后创建或更新本地的缓存,也就是 store。
  • 如果事件类型是 Added(添加对象),那么 Informer 会通过 Indexer 的库把这个增量里的 API 对象保存到本地的缓存中,并为它创建索引,若为删除操作,则在本地缓存中删除该对象。
  • DeltaFIFO 再 pop 这个事件到 controller 中,controller 会调用事先注册的 ResourceEventHandler 回调函数进行处理。
  • 在 ResourceEventHandler 回调函数中,其实只是做了一些很简单的过滤,然后将关心变更的 Object 放到 workqueue 里面。
  • Controller 从 workqueue 里面取出 Object,启动一个 worker 来执行自己的业务逻辑,业务逻辑通常是计算目前集群的状态和用户希望达到的状态有多大的区别,然后孜孜不倦地让 apiserver 将状态演化到用户希望达到的状态,比如为 deployment 创建新的 pods,或者是扩容/缩容 deployment。
  • 在worker中就可以使用 lister 来获取 resource,而不用频繁的访问 apiserver,因为 apiserver 中 resource 的变更都会反映到本地的 cache 中。

Informer 在使用时需要先初始化一个 InformerFactory,目前主要推荐使用的是 SharedInformerFactory,Shared 指的是在多个 Informer 中共享一个本地 cache。

Informer 中的 ResourceEventHandler 函数有三种:

1
2
3
4
5
6
7
8
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{})
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

这三种函数的处理逻辑是用户自定义的,在初始化 controller 时注册完 ResourceEventHandler 后,一旦该对象的实例有创建、删除、更新三中操作后就会触发对应的 ResourceEventHandler。

informer的使用示例

在实际的开发工作中,Informer 主要用在两处:

  • 在访问 k8s apiserver 的客户端作为一个 client 缓存对象使用;
  • 在一些自定义 controller 中使用,比如 operator 的开发;

下面是一个client的使用示例

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
package main

import (
	"flag"
	"fmt"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/labels"
	"k8s.io/client-go/informers"
	podinfor "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/clientcmd"
	"os"
	"path/filepath"
	"time"
)



var (
	kubeInformerFactory informers.SharedInformerFactory
	podInformer podinfor.PodInformer
)

func homeDir() string{
	if h := os.Getenv("HOME");h != ""{
		return h
	}
	return os.Getenv("USERPROFILE") //windows的家目录
}

func main()  {
	var kubeconfig *string
	stopCh := make(chan struct{})
	// home是家目录,如果能取到家目录的值,就可以用来做默认值
	if home := homeDir();home != ""{
		// 如果输入了kubeconfig参数,该参数的值就是kubeconfig文件的路径,
		// 如果没有输入kubeconfig参数,就用默认路径 ~/.kube/config
		kubeconfig = flag.String("kubeconfig",filepath.Join(home,".kube","config"),"(optional)  path to the kubeconfig file")
	}else{
		//如果取不到当前用户的家目录,就没有办法设置kubeconfig的默认目录了,只能从入参中取
		kubeconfig = flag.String("kubeconfig","","(optional)  path to the kubeconfig file")
	}

	flag.Parse()

	// 从本机加载kubeconfig配置文件,因此第一个参数为空字符串
	config, err := clientcmd.BuildConfigFromFlags("",*kubeconfig)
	// kubeconfig 加载失败就直接退出
	if err != nil{
		fmt.Println("load kubeconfig failed!,err:",err)
		panic(err.Error())
	}

	//实例化clientset对象
	clientset,err := kubernetes.NewForConfig(config)

	if err != nil{
		fmt.Println("init clientset failed!err:",err)
		panic(err.Error())
	}

	kubeInformerFactory = informers.NewSharedInformerFactory(clientset,1000000000)
	//创建pod informer
	podInformer = kubeInformerFactory.Core().V1().Pods()

	//podInformer.Lister()
	// 设置相关事件的回调函数
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: addPod,
		UpdateFunc: updatePod,
		DeleteFunc: deletePod,
	})

	kubeInformerFactory.Start(stopCh)
	time.Sleep(11 * time.Second)

	pods,err := podInformer.Lister().List(labels.Everything())
	if err != nil{
		fmt.Println(err)
		panic(err.Error())
	}
	fmt.Println(pods)
	for _,pod :=range pods{
		fmt.Println(pod.Namespace,pod.Name,pod.Status.Phase)
	}

	<- stopCh
}

func updatePod(old ,cur interface{})  {
	fmt.Println("update pod")

}

func addPod(obj interface{})  {
	fmt.Println("add pod")
	
}

func deletePod(obj interface{})  {
	fmt.Println("delete pod")

}

Shared指的是多个 lister 共享同一个cache,而且资源的变化会同时通知到cache和 listers。这个解释和上面图所展示的内容的是一致的,cache我们在Indexer的介绍中已经分析过了,lister 指的就是OnAdd、OnUpdate、OnDelete 这些回调函数背后的对象。

作为controller使用的一个整体工作流程

(1) 创建一个控制器

  • 为控制器创建 workqueue
  • 创建 informer, 为 informer 添加 callback 函数,创建 lister

(2) 启动控制器

  • 启动 informer
  • 等待本地 cache sync 完成后, 启动 workers

(3) 当收到变更事件后,执行 callback

  • 等待事件触发
  • 从事件中获取变更的 Object
  • 做一些必要的检查
  • 生成 object key,一般是 namespace/name 的形式
  • 将 key 放入 workqueue 中

(4) worker loop

  • 等待从 workqueue 中获取到 item,一般为 object key
  • 用 object key 通过 lister 从本地 cache 中获取到真正的 object 对象
  • 做一些检查
  • 执行真正的业务逻辑
  • 处理下一个 item

下面是自定义 controller 使用的一个参考:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
var (
    masterURL  string
    kubeconfig string
)

func init() {
    flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}

func main() {
    flag.Parse()

    stopCh := signals.SetupSignalHandler()

    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    if err != nil {
        glog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    // 所谓 Informer,其实就是一个带有本地缓存和索引机制的、可以注册 EventHandler 的 client
    // informer watch apiserver,每隔 30 秒 resync 一次(list)
    kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)

    controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes())

    //  启动 informer
    go kubeInformerFactory.Start(stopCh)

     // start controller
    if err = controller.Run(2, stopCh); err != nil {
        glog.Fatalf("Error running controller: %s", err.Error())
    }
}


// NewController returns a new network controller
func NewController(
    kubeclientset kubernetes.Interface,
    networkclientset clientset.Interface,
    networkInformer informers.NetworkInformer) *Controller {

    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
    glog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    controller := &Controller{
        kubeclientset:    kubeclientset,
        networkclientset: networkclientset,
        networksLister:   networkInformer.Lister(),
        networksSynced:   networkInformer.Informer().HasSynced,
        workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
        recorder:         recorder,
    }

    glog.Info("Setting up event handlers")
    // Set up an event handler for when Network resources change
    networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueNetwork,
        UpdateFunc: func(old, new interface{}) {
            oldNetwork := old.(*samplecrdv1.Network)
            newNetwork := new.(*samplecrdv1.Network)
            if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
                // Periodic resync will send update events for all known Networks.
                // Two different versions of the same Network will always have different RVs.
                return
            }
            controller.enqueueNetwork(new)
        },
        DeleteFunc: controller.enqueueNetworkForDelete,
    })

    return controller
}

自定义 controller 的详细使用方法可以参考:k8s-controller-custom-resource

总结

Informer 主要功能是缓存对象到本地以及根据对应的事件类型触发已注册好的 ResourceEventHandler,其主要用在访问 k8s apiserver 的客户端和 operator 中。