摘要:
operator使用多路复用从socket接收数据后写入队列, 由独立的处理协程读出数据处理.
本文记录operator读取数据到数据队列的过程
调用堆栈:
(gdb) bt #0 k8s.io/client-go/util/workqueue.(*Type).Add (q=0xc00007e420, item=...) at /root/work/hello/vendor/k8s.io/client-go/util/workqueue/queue.go:121 #1 0x0000000000b9139d in k8s.io/client-go/util/workqueue.(*delayingType).Add (.this=0xc00007e6c0, item=...) at <autogenerated>:1 #2 0x0000000000b919dd in k8s.io/client-go/util/workqueue.(*rateLimitingType).Add (.this=0xc00013c240, item=...) at <autogenerated>:1 #3 0x00000000019ce55a in sigs.k8s.io/controller-runtime/pkg/handler.(*EnqueueRequestForObject).Create (e=0x2b9eb70 <runtime.zerobase>, evt=..., q=...) at /root/work/hello/vendor/sigs.k8s.io/controller-runtime/pkg/handler/enqueue.go:44 #4 0x0000000001a18e59 in sigs.k8s.io/controller-runtime/pkg/source/internal.EventHandler.OnAdd (e=..., obj=...) at /root/work/hello/vendor/sigs.k8s.io/controller-runtime/pkg/source/internal/eventsource.go:63 #5 0x0000000001a1a08f in sigs.k8s.io/controller-runtime/pkg/source/internal.(*EventHandler).OnAdd (.this=0xc0001de7c0, obj=...) at <autogenerated>:1 #6 0x00000000019b65b8 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 () at /root/work/hello/vendor/k8s.io/client-go/tools/cache/shared_informer.go:787 #7 0x0000000000b8631c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 (f={void (void)} 0xc00006dd88) at /root/work/hello/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:155 #8 0x0000000000b860c9 in k8s.io/apimachinery/pkg/util/wait.BackoffUntil (f={void (void)} 0xc00006de48, backoff=..., sliding=true, stopCh=0xc0000222a0) at /root/work/hello/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:156 #9 0x0000000000b85fa5 in k8s.io/apimachinery/pkg/util/wait.JitterUntil (f={void (void)} 0xc00006dea0, period=1000000000, jitterFactor=0, sliding=true, stopCh=0xc0000222a0) at /root/work/hello/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:133 #10 0x0000000000b85ed3 in k8s.io/apimachinery/pkg/util/wait.Until (f={void (void)} 0xc00006ded8, period=1000000000, stopCh=0xc0000222a0) at /root/work/hello/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:90 #11 0x00000000019b645a in k8s.io/client-go/tools/cache.(*processorListener).run (p=0xc00021a280) at /root/work/hello/vendor/k8s.io/client-go/tools/cache/shared_informer.go:781 #12 0x00000000019bcf4b in k8s.io/client-go/tools/cache.(*processorListener).run-fm () at /root/work/hello/vendor/k8s.io/client-go/tools/cache/shared_informer.go:775 #13 0x0000000000b85df8 in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 () at /root/work/hello/vendor/k8s.io/apimachinery/pkg/util/wait/wait.go:73 #14 0x000000000046afe1 in runtime.goexit () at /usr/local/go/src/runtime/asm_amd64.s:1581 #15 0x0000000000000000 in ?? ()
核心函数:
wait.Until
// Until loops until stop channel is closed, running f every period. // // Until is syntactic sugar on top of JitterUntil with zero jitter factor and // with sliding = true (which means the timer for period starts after the f // completes). func Until(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, true, stopCh) }
// JitterUntil loops until stop channel is closed, running f every period. // // If jitterFactor is positive, the period is jittered before every run of f. // If jitterFactor is not positive, the period is unchanged and not jittered. // // If sliding is true, the period is computed after f runs. If it is false then // period includes the runtime for f. // // Close stopCh to stop. f may not be invoked if stop channel is already // closed. Pass NeverStop to if you don't want it stop. func JitterUntil(f func(), period time.Duration, jitterFactor float64, sliding bool, stopCh <-chan struct{}) { BackoffUntil(f, NewJitteredBackoffManager(period, jitterFactor, &clock.RealClock{}), sliding, stopCh) }
// BackoffUntil loops until stop channel is closed, run f every duration given by BackoffManager. // // If sliding is true, the period is computed after f runs. If it is false then // period includes the runtime for f. func BackoffUntil(f func(), backoff BackoffManager, sliding bool, stopCh <-chan struct{}) { var t clock.Timer for { select { case <-stopCh: return default: } if !sliding { t = backoff.Backoff() } func() { defer runtime.HandleCrash() f() }() if sliding { t = backoff.Backoff() } // NOTE: b/c there is no priority selection in golang // it is possible for this to race, meaning we could // trigger t.C and stopCh, and t.C select falls through. // In order to mitigate we re-check stopCh at the beginning // of every loop to prevent extra executions of f(). select { case <-stopCh: if !t.Stop() { <-t.C() } return case <-t.C(): } } }
processorListener:run
func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) }
EventHandler:OnAdd
// OnAdd creates CreateEvent and calls Create on EventHandler. func (e EventHandler) OnAdd(obj interface{}) { c := event.CreateEvent{} // Pull Object out of the object if o, ok := obj.(client.Object); ok { c.Object = o } else { log.Error(nil, "OnAdd missing Object", "object", obj, "type", fmt.Sprintf("%T", obj)) return } for _, p := range e.Predicates { if !p.Create(c) { return } } // Invoke create handler e.EventHandler.Create(c, e.Queue) }
EnqueueRequestForObject:Create
// Create implements EventHandler. func (e *EnqueueRequestForObject) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { if evt.Object == nil { enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ Name: evt.Object.GetName(), Namespace: evt.Object.GetNamespace(), }}) }
sharedIndexInformer:AddEventHandlerWithResyncPeriod
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() if s.stopped { klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler) return } if resyncPeriod > 0 { if resyncPeriod < minimumResyncPeriod { klog.Warningf("resyncPeriod %v is too small. Changing it to the minimum allowed value of %v", resyncPeriod, minimumResyncPeriod) resyncPeriod = minimumResyncPeriod } if resyncPeriod < s.resyncCheckPeriod { if s.started { klog.Warningf("resyncPeriod %v is smaller than resyncCheckPeriod %v and the informer has already started. Changing it to %v", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod) resyncPeriod = s.resyncCheckPeriod } else { // if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update // resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners // accordingly s.resyncCheckPeriod = resyncPeriod s.processor.resyncCheckPeriodChanged(resyncPeriod) } } } listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) if !s.started { s.processor.addListener(listener) return } // in order to safely join, we have to // 1. stop sending add/update/delete notifications // 2. do a list against the store // 3. send synthetic "Add" events to the new handler // 4. unblock s.blockDeltas.Lock() defer s.blockDeltas.Unlock() s.processor.addListener(listener) for _, item := range s.indexer.List() { listener.add(addNotification{newObj: item}) } }