我们与k8s
集群的交互,主要是通过向api-server
提交资源对象。
举个例子,我们要部署一个应用,则只需要提交一个deployment
类型的资源对象到api-server
。
api-server
在验证用户提交的资源对象通过之后,会将该资源对象保存到etcd
集群中。
同时,api-server
还需要通知deployment-manager
有一个新的deployment
资源对象被新增了,从而能够对它进行处理,比如创建replicaSet
。
也就是,在k8s
中,需要有一个机制,能够让资源对象的controller
能够感知到资源对象。
Informer
接口就是用于实现该功能。
Informer
提供了:
ListAndWatch
:用于跟api-server
同步资源对象列表和更新
带索引的缓存功能:将资源对象缓存到本地,可以直接通过缓存查询资源对象内容,减轻api-server
压力
事件回调功能:提供回调机制,当有资源对象新增、更新或者删除时可以回调自定义接口
Informer
是client-go
中一个非常核心的功能,除了在controller-manager
、scheduler
等组件中被广泛使用,在各种自定义资源对象的控制器中也离不开它的身影。
上图描述了一个资源对象的控制器的大体工作流程,来自于极客时间的《深入剖析Kubernetes》课程:
控制器通过informer
与api-server
同步资源对象的列表和变更
在事件回调中,将事件加入到workQueue
中,这里workQueue
可以协调生产者与消费者之间的速率,而且消费失败的事件可以重新加入队列,一般使用限速队列,当消费失败重新加入队列到下次重新消费之前,限速器会根据重试次数产生一定的延时,因为一般消费失败,马上进行重试很大概率还是会失败。
在控制循环中,不断从workQueue
取出事件进行处理
接下来看一下informer
的实现,相关的代码都在k8s.io/client-go/tools/cache
包下面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 type ResourceEventHandler interface { OnAdd(obj interface {}) OnUpdate(oldObj, newObj interface {}) OnDelete(obj interface {}) } type SharedInformer interface { AddEventHandler(handler ResourceEventHandler) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) GetStore() Store GetController() Controller Run(stopCh <-chan struct {}) HasSynced() bool LastSyncResourceVersion() string } type SharedIndexInformer interface { SharedInformer AddIndexers(indexers Indexers) error GetIndexer() Indexer }
接下来,我们看一下SharedIndexInformer
接口的实现
创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector MutationDetector listerWatcher ListerWatcher objectType runtime.Object resyncCheckPeriod time.Duration defaultEventHandlerResyncPeriod time.Duration clock clock.Clock started, stopped bool startedLock sync.Mutex blockDeltas sync.Mutex }
接着看一下对应的创建:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 func NewSharedInformer (lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer { return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{}) } func NewSharedIndexInformer (lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: objType, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T" , objType)), clock: realClock, } return sharedIndexInformer }
上面的NewIndex
方法,实际上返回的是一个并发安全的map
,同时可以根据indexers
中的索引函数来对资源对象进行索引。
启动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 func (s *sharedIndexInformer) Run (stopCh <-chan struct {}) { defer utilruntime.HandleCrash() fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer) cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false , ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, } func () { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() processorStopCh := make (chan struct {}) var wg wait.Group defer wg.Wait() defer close (processorStopCh) wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run) wg.StartWithChannel(processorStopCh, s.processor.run) defer func () { s.startedLock.Lock() defer s.startedLock.Unlock() s.stopped = true }() s.controller.Run(stopCh) }
可以看到,在Run
方法中:
设置状态为开始运行
创建一个deltaFifo
和controller
后台运行脏缓存检查
后台为每个processorListener
启动pop
和run
工作协程,开始监听事件。一旦有新的事件到来,会先调用s.HandleDeltas
将其更新到本地缓存,然后通过channel
通知每个processorListener
,processorListener
再去回调我们注册的事件handler。
调用controller
的Run
方法,实际跟api-server
的listAndWatch
交互是由controller
来负责的
解密controller 接下来我们看一下controller
的实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 type Controller interface { Run(stopCh <-chan struct {}) HasSynced() bool LastSyncResourceVersion() string } type controller struct { config Config reflector *Reflector reflectorMutex sync.RWMutex clock clock.Clock } func New (c *Config) Controller { ctlr := &controller{ config: *c, clock: &clock.RealClock{}, } return ctlr }
接下来看一下主要的Run
方法的逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func (c *controller) Run (stopCh <-chan struct {}) { defer utilruntime.HandleCrash() go func () { <-stopCh c.config.Queue.Close() }() r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) r.ShouldResync = c.config.ShouldResync r.clock = c.clock c.reflectorMutex.Lock() c.reflector = r c.reflectorMutex.Unlock() var wg wait.Group defer wg.Wait() wg.StartWithChannel(stopCh, r.Run) wait.Until(c.processLoop, time.Second, stopCh) }
看一下processLoop
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func (c *controller) processLoop () { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { c.config.Queue.AddIfNotPresent(obj) } } } }
Reflector:ListAndWatch 我们接下来看一下Reflector
的主要逻辑:
1 2 3 4 5 6 7 8 9 func (r *Reflector) Run (stopCh <-chan struct {}) { klog.V(3 ).Infof("Starting reflector %v (%s) from %s" , r.expectedTypeName, r.resyncPeriod, r.name) wait.Until(func () { if err := r.ListAndWatch(stopCh); err != nil { utilruntime.HandleError(err) } }, r.period, stopCh) }
接下来看一下ListAndWatch
方法,该方法有点长,只保留相关代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 func (r *Reflector) ListAndWatch (stopCh <-chan struct {}) error { var resourceVersion string options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()} if err := func () error { var list runtime.Object var err error listCh := make (chan struct {}, 1 ) go func () { pager := pager.New(pager.SimplePageFunc(func (opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) if r.WatchListPageSize != 0 { pager.PageSize = r.WatchListPageSize } list, err = pager.List(context.Background(), options) close (listCh) }() select { case <-stopCh: return nil case <-listCh: } listMetaInterface, err := meta.ListAccessor(list) resourceVersion = listMetaInterface.GetResourceVersion() items, err := meta.ExtractList(list) if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("%s: Unable to sync list result: %v" , r.name, err) } r.setLastSyncResourceVersion(resourceVersion) return nil }(); err != nil { return err } go func () { resyncCh, cleanup := r.resyncChan() defer func () { cleanup() }() for { select { case <-resyncCh: case <-stopCh: return case <-cancelCh: return } if r.ShouldResync == nil || r.ShouldResync() { if err := r.store.Resync(); err != nil { resyncerrc <- err return } } cleanup() resyncCh, cleanup = r.resyncChan() } }() for { timeoutSeconds := int64 (minWatchTimeout.Seconds() * (rand.Float64() + 1.0 )) options = metav1.ListOptions{ ResourceVersion: resourceVersion, TimeoutSeconds: &timeoutSeconds, AllowWatchBookmarks: true , } w, err := r.listerWatcher.Watch(options) if err != nil { if utilnet.IsConnectionRefused(err) { time.Sleep(time.Second) continue } return nil } if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil { ... return nil } } }
watch
的主要逻辑都在watchHandler
中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 func (r *Reflector) watchHandler (w watch.Interface, resourceVersion *string , errc chan error, stopCh <-chan struct {}) error { start := r.clock.Now() eventCount := 0 defer w.Stop() loop: for { select { case <-stopCh: return errorStopRequested case err := <-errc: return err case event, ok := <-w.ResultChan(): if !ok { break loop } meta, err := meta.Accessor(event.Object) if err != nil { continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { case watch.Added: err := r.store.Add(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v" , r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v" , r.name, event.Object, err)) } case watch.Deleted: err := r.store.Delete(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v" , r.name, event.Object, err)) } case watch.Bookmark: default : utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v" , r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } return nil }
client-go
还提供了一个SharedInformerFactory
,方便我们创建(多个)Informer
。
总结 我们可以看到,Reflector
会通过ListAndWatch
接口,从api-server
同步资源对象的变更情况,然后添加到DeltaFifo
队列中;
controller
会不断从DeltaFifo
中取出事件,然后回调sharedIndexInformer
的HandleDeltas
方法;
HandleDeltas
方法中,会更新本地缓存,然后根据事件类型调用注册的事件handler。
还要一些细节,比如DeltaFifo
,Indexer
,processorListener
的实现,或者Informer
的HasSynced
接口实现等,比较简单就不在讲了。