|
Kubernetes Informer基本原理
2024年01月30日 09:01
424 本文分析 k8s controller 中 informer 启动的基本流程不论是 k8s 自身组件,还是自己编写 controller,都需要通过 apiserver 监听 etcd 事件来完成自己的控制循环逻辑。如何高效可靠进行事件监听,k8s 客户端工具包 client-go 提供了一个通用的 informer 包,通过 informer,可以方便和高效的进行 controller 开发。informer 包提供了如下的一些功能:1、本地缓存(store)2、索引机制(indexer)3、Handler 注册功能(eventHandler)1、informer 架构整个 informer 机制架构如下图(图片源自 Client-go):可以看到这张图分为上下两个部分,上半部分由 client-go 提供,下半部分则是需要自己实现的控制循环逻辑本文主要分析上半部分的逻辑,包括下面几个组件:1.1、Reflector:从图上可以看到 Reflector 是一个和 apiserver 交互的组件,通过 list 和 watch api 将资源对象压入队列1.2、DeltaFifo:DeltaFifo的结构体示意如下:type DeltaFIFO struct { ... // We depend on the property that items in the s et are in // the queue and vice versa, and that all Deltas in this // map have at least one Delta. items map[string]Deltas queue []string ...}主要分为两部分,fifo 和 delta(1)fifo:先进先出队列对应结构体中的 queue,结构体示例如下:[default/centos-fd77b5886-pfrgn, xxx, xxx](2)delta:对应结构体中的items,存储了资源对象并且携带了资源操作类型的一个 map,结构体示例如下:map:{"default/centos-fd77b5886-pfrgn":[{Replaced &od{ObjectMeta: ${pod参数}], "xxx": [{},{}]}消费者从 queue 中 pop 出对象进行消费,并从 items 获取具体的消费操作(执行动作 Update/Deleted/Sync,和执行的对象 object spec)1.3、Indexer:client-go 用来存储资源对象并自带索引功能的本地存储,deltaFIFO 中 pop 出的对象将存储到 Indexer。indexer 与 etcd 集群中的数据保持一致,从而 client-go 可以直接从本地缓存获取资源对象,减少 apiserver 和 etcd 集群的压力。2、一个基本例子func main() { stopCh := make(chan struct{}) defer close(stopCh) // (1)New a k8s clientset masterUrl := "172.27.32.110:8080" config, err := clientcmd.BuildConfigFromFlags(masterUrl, "") if err != nil { klog.Errorf("BuildConfigFromFlags err, err: %v", err) } clientset, err := k.NewForConfig(config) if err != nil { klog.Errorf("Get clientset err, err: %v", err) } // (2)New a sharedInformers factory sharedInformers := informers.NewSharedInformerFactory(clientset, defaultResync) // (3)Register a informer // f.informers[informerType] = informer, // the detail for informer is build in NewFilteredPodInformer() podInformer := sharedInformers.Core().V1().Pods().Informer() // (4)Register event handler podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { mObj := obj.(v1.Object) klog.Infof("Get new obj: %v", mObj) klog.Infof("Get new obj name: %s", mObj.GetName()) }, }) // (5)Start all informers sharedInformers.Start(stopCh) // (6)A cronjob for cache sync if !cache.WaitForCacheSync(stopCh, podInformer.HasSynced) { klog.Infof("Cache sync fail!") } // (7)Use lister podLister := sharedInformers.Core().V1().Pods().Lister() pods, err := podLister.List(labels.Everything()) if err != nil { klog.Infof("err: %v", err) } klog.Infof("len(pods), %d", len(pods)) for _, v := range pods { klog.Infof("pod: %s", v.Name) } 0 { if _, exists := f.items[id]; !exists { f.queue = append(f.queue, id) } f.items[id] = newDeltas f.cond.Broadcast() // 通知所有阻塞住的消费者 } ... return nil}(2)消费—c.processLoop:消费逻辑就是从 DeltaFifo pop 出对象,然后做两件事情:(1)触发前面注册的 eventhandler (2)更新本地索引缓存 indexer,保持数据和 etcd 一致func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) }}### Queue.Pop:## Queue.Pop是一个带有处理函数的pod方法,首先先看Pod逻辑,即为deltaFifo的pop方法:func (f *DeltaFIFO) op(process opProcessFunc) (interface{}, error) { for { // 无限循环 for len(f.queue) == 0 { f.cond.Wait() // 阻塞直到生产端broadcast方法通知 } id := f.queue[0] item, ok := f.items[id] delete(f.items, id) err := process(item) // 执行处理方法 if e, ok := err.(ErrRequeue); ok { f.addIfNotPresent(id, item) // 如果处理失败的重新加入到fifo中重新处理 err = e.Err } return item, err }}### c.config.Process:## c.config.Process是在初始化controller的时候赋值的,即为前面的s.HandleDeltas### s.HandleDeltas:func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil & exists { if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // Sync events are only propagated to listeners that requested resync isSync = true case d.Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Replaced events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil}可以看到上面主要执行两部分逻辑:s.processor.distribute#### s.processor.distribute:### 例如新增通知:s.processor.distribute(addNotification{newObj: d.Object}, false)### 其中addNotification就是add类型的通知,后面会通过notification结构体的类型来执行不同的eventHandlerfunc (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } }}func (p *processorListener) add(notification interface{}) { p.addCh
|
|