摘要:
记录k8s的pv-controller的核心处理
参考:
从零开始入门 K8s | Kubernetes 存储架构及插件使用 - 知乎
核心处理:
入口Run
// Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(ctx context.Context) { defer utilruntime.HandleCrash() defer ctrl.claimQueue.ShutDown() defer ctrl.volumeQueue.ShutDown() klog.Infof("Starting persistent volume controller") defer klog.Infof("Shutting down persistent volume controller") if !cache.WaitForNamedCacheSync("persistent volume", ctx.Done(), ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) { return } ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister) go wait.Until(ctrl.resync, ctrl.resyncPeriod, ctx.Done()) go wait.UntilWithContext(ctx, ctrl.volumeWorker, time.Second) go wait.UntilWithContext(ctx, ctrl.claimWorker, time.Second) metrics.Register(ctrl.volumes.store, ctrl.claims, &ctrl.volumePluginMgr) <-ctx.Done() }
volumeWorker
// volumeWorker processes items from volumeQueue. It must run only once, // syncVolume is not assured to be reentrant. func (ctrl *PersistentVolumeController) volumeWorker(ctx context.Context) { workFunc := func(ctx context.Context) bool { keyObj, quit := ctrl.volumeQueue.Get() if quit { return true } defer ctrl.volumeQueue.Done(keyObj) key := keyObj.(string) klog.V(5).Infof("volumeWorker[%s]", key) _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting name of volume %q to get volume from informer: %v", key, err) return false } volume, err := ctrl.volumeLister.Get(name) if err == nil { // The volume still exists in informer cache, the event must have // been add/update/sync ctrl.updateVolume(ctx, volume) return false } if !errors.IsNotFound(err) { klog.V(2).Infof("error getting volume %q from informer: %v", key, err) return false } // The volume is not in informer cache, the event must have been // "delete" volumeObj, found, err := ctrl.volumes.store.GetByKey(key) if err != nil { klog.V(2).Infof("error getting volume %q from cache: %v", key, err) return false } if !found { // The controller has already processed the delete event and // deleted the volume from its cache klog.V(2).Infof("deletion of volume %q was already processed", key) return false } volume, ok := volumeObj.(*v1.PersistentVolume) if !ok { klog.Errorf("expected volume, got %+v", volumeObj) return false } ctrl.deleteVolume(volume) return false } for { if quit := workFunc(ctx); quit { klog.Infof("volume worker queue shutting down") return } } }
// updateIndices modifies the objects location in the managed indexes: // - for create you must provide only the newObj // - for update you must provide both the oldObj and the newObj // - for delete you must provide only the oldObj // updateIndices must be called from a function that already has a lock on the cache func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { var oldIndexValues, indexValues []string var err error for name, indexFunc := range c.indexers { if oldObj != nil { oldIndexValues, err = indexFunc(oldObj) } else { oldIndexValues = oldIndexValues[:0] } if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } if newObj != nil { indexValues, err = indexFunc(newObj) } else { indexValues = indexValues[:0] } if err != nil { panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err)) } index := c.indices[name] if index == nil { index = Index{} c.indices[name] = index } for _, value := range oldIndexValues { // We optimize for the most common case where index returns a single value. if len(indexValues) == 1 && value == indexValues[0] { continue } c.deleteKeyFromIndex(key, value, index) } for _, value := range indexValues { // We optimize for the most common case where index returns a single value. if len(oldIndexValues) == 1 && value == oldIndexValues[0] { continue } c.addKeyToIndex(key, value, index) } } }
claimWorker
// claimWorker processes items from claimQueue. It must run only once, // syncClaim is not reentrant. func (ctrl *PersistentVolumeController) claimWorker(ctx context.Context) { workFunc := func() bool { keyObj, quit := ctrl.claimQueue.Get() if quit { return true } defer ctrl.claimQueue.Done(keyObj) key := keyObj.(string) klog.V(5).Infof("claimWorker[%s]", key) namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err) return false } claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name) if err == nil { // The claim still exists in informer cache, the event must have // been add/update/sync ctrl.updateClaim(ctx, claim) return false } if !errors.IsNotFound(err) { klog.V(2).Infof("error getting claim %q from informer: %v", key, err) return false } // The claim is not in informer cache, the event must have been "delete" claimObj, found, err := ctrl.claims.GetByKey(key) if err != nil { klog.V(2).Infof("error getting claim %q from cache: %v", key, err) return false } if !found { // The controller has already processed the delete event and // deleted the claim from its cache klog.V(2).Infof("deletion of claim %q was already processed", key) return false } claim, ok := claimObj.(*v1.PersistentVolumeClaim) if !ok { klog.Errorf("expected claim, got %+v", claimObj) return false } ctrl.deleteClaim(claim) return false } for { if quit := workFunc(); quit { klog.Infof("claim worker queue shutting down") return } } }