//创建wait命令 func NewCmdWait(restClientGetter genericclioptions.RESTClientGetter, streams genericclioptions.IOStreams) *cobra.Command { flags := NewWaitFlags(restClientGetter, streams)//初始化wait flag cmd := &cobra.Command{//创建cobra命令 Use: "wait ([-f FILENAME] | resource.group/resource.name | resource.group [(-l label | --all)]) [--for=delete|--for condition=available]", Short: "Experimental: Wait for a specific condition on one or many resources.", Long: waitLong, Example: waitExample, DisableFlagsInUseLine: true, Run: func(cmd *cobra.Command, args []string) { o, err := flags.ToOptions(args)//wait flag转waitOption cmdutil.CheckErr(err) err = o.RunWait()//运行 cmdutil.CheckErr(err) }, SuggestFor: []string{"list", "ps"}, } flags.AddFlags(cmd)//添加选项 return cmd } // AddFlags registers flags for a cli func (flags *WaitFlags) AddFlags(cmd *cobra.Command) {//选项 flags.PrintFlags.AddFlags(cmd)//打印选项 flags.ResourceBuilderFlags.AddFlags(cmd.Flags())//resource builder选项 cmd.Flags().DurationVar(&flags.Timeout, "timeout", flags.Timeout, "The length of time to wait before giving up. Zero means check once and don't wait, negative means wait for a week.")//timeout选项 cmd.Flags().StringVar(&flags.ForCondition, "for", flags.ForCondition, "The condition to wait on: [delete|condition=condition-name].")//for选项 } // ToOptions converts from CLI inputs to runtime inputs func (flags *WaitFlags) ToOptions(args []string) (*WaitOptions, error) {//wait flag转option printer, err := flags.PrintFlags.ToPrinter()//print flag转printer if err != nil { return nil, err } builder := flags.ResourceBuilderFlags.ToBuilder(flags.RESTClientGetter, args)//创建builder clientConfig, err := flags.RESTClientGetter.ToRESTConfig()//获取restconfig if err != nil { return nil, err } dynamicClient, err := dynamic.NewForConfig(clientConfig)//获取dynamicClient if err != nil { return nil, err } conditionFn, err := conditionFuncFor(flags.ForCondition, flags.ErrOut)//获取condition函数 if err != nil { return nil, err } effectiveTimeout := flags.Timeout if effectiveTimeout < 0 {//设置超时时间 effectiveTimeout = 168 * time.Hour } o := &WaitOptions{//构造wait option ResourceFinder: builder, DynamicClient: dynamicClient, Timeout: effectiveTimeout, Printer: printer, ConditionFn: conditionFn, IOStreams: flags.IOStreams, } return o, nil }
//获取condition函数 func conditionFuncFor(condition string, errOut io.Writer) (ConditionFunc, error) { if strings.ToLower(condition) == "delete" {//如果条件是delete,则返回IsDeleted函数 return IsDeleted, nil } if strings.HasPrefix(condition, "condition=") {//如果有condition=前缀 conditionName := condition[len("condition="):]//获取condition名称 conditionValue := "true"//设置conditionvalue if equalsIndex := strings.Index(conditionName, "="); equalsIndex != -1 { conditionValue = conditionName[equalsIndex+1:] conditionName = conditionName[0:equalsIndex] } return ConditionalWait{//构造conditionWait,返回isConditionMet函数 conditionName: conditionName, conditionStatus: conditionValue, errOut: errOut, }.IsConditionMet, nil } return nil, fmt.Errorf("unrecognized condition: %q", condition) }
//运行 func (o *WaitOptions) RunWait() error { visitCount := 0 err := o.ResourceFinder.Do().Visit(func(info *resource.Info, err error) error {// visit result if err != nil { return err } visitCount++ finalObject, success, err := o.ConditionFn(info, o)//返回condition状态 if success {//如果是success,打印对象 o.Printer.PrintObj(finalObject, o.Out) return nil } if err == nil { return fmt.Errorf("%v unsatisified for unknown reason", finalObject) } return err }) if err != nil { return err } if visitCount == 0 { return errNoMatchingResources } return err }
//删除condition函数 func IsDeleted(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { endTime := time.Now().Add(o.Timeout)//设置结束时间 for { if len(info.Name) == 0 {//名称为空返回错误 return info.Object, false, fmt.Errorf("resource name must be provided") } nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()//构造field-selector // List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion) gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})//删选资源 if apierrors.IsNotFound(err) {//如果未找到资源返回true return info.Object, true, nil } if err != nil {//有错误返回错误 // TODO this could do something slightly fancier if we wish return info.Object, false, err } if len(gottenObjList.Items) != 1 {//items不为一个,返回true return info.Object, true, nil } gottenObj := &gottenObjList.Items[0] resourceLocation := ResourceLocation{//构造resourceLocation GroupResource: info.Mapping.Resource.GroupResource(), Namespace: gottenObj.GetNamespace(), Name: gottenObj.GetName(), } if uid, ok := o.UIDMap[resourceLocation]; ok { if gottenObj.GetUID() != uid {//uid不等,返回true return gottenObj, true, nil } } watchOptions := metav1.ListOptions{}//构造wait结构体 watchOptions.FieldSelector = nameSelector watchOptions.ResourceVersion = gottenObjList.GetResourceVersion() objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)//获取watch对象 if err != nil { return gottenObj, false, err } timeout := endTime.Sub(time.Now()) errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) if timeout < 0 {//超时,返回错误 // we're out of time return gottenObj, false, errWaitTimeoutWithName } ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)//获取ctx和cancel watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, Wait{errOut: o.ErrOut}.IsDeleted)//等待条件满足 cancel() switch { case err == nil:// 错误为nil返回true return watchEvent.Object, true, nil case err == watchtools.ErrWatchClosed: continue case err == wait.ErrWaitTimeout: if watchEvent != nil { return watchEvent.Object, false, errWaitTimeoutWithName } return gottenObj, false, errWaitTimeoutWithName default: return gottenObj, false, err } } }
//判断是否否删除完成 func (w Wait) IsDeleted(event watch.Event) (bool, error) { switch event.Type {//判断事件类型 case watch.Error://有错误打印错误,返回false // keep waiting in the event we see an error - we expect the watch to be closed by // the server if the error is unrecoverable. err := apierrors.FromObject(event.Object) fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the object to be deleted: %v", err) return false, nil case watch.Deleted://delete,返回true return true, nil default: return false, nil//默认返回false } }
//条件函数 func (w ConditionalWait) IsConditionMet(info *resource.Info, o *WaitOptions) (runtime.Object, bool, error) { endTime := time.Now().Add(o.Timeout)//设置超时时间 for { if len(info.Name) == 0 {//name为空返回错误 return info.Object, false, fmt.Errorf("resource name must be provided") } nameSelector := fields.OneTermEqualSelector("metadata.name", info.Name).String()//构造字段选择器 var gottenObj *unstructured.Unstructured // List with a name field selector to get the current resourceVersion to watch from (not the object's resourceVersion) gottenObjList, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).List(metav1.ListOptions{FieldSelector: nameSelector})//删选资源 resourceVersion := "" switch { case err != nil://err不为空返回错误 return info.Object, false, err case len(gottenObjList.Items) != 1://items不为1个 resourceVersion = gottenObjList.GetResourceVersion() default: gottenObj = &gottenObjList.Items[0]//获取obj conditionMet, err := w.checkCondition(gottenObj)//判断条件是否满足 if conditionMet {//如果满足返回true return gottenObj, true, nil } if err != nil {//有错误返回错误 return gottenObj, false, err } resourceVersion = gottenObjList.GetResourceVersion()//设置resourceVersion } watchOptions := metav1.ListOptions{}//构造waitoption watchOptions.FieldSelector = nameSelector watchOptions.ResourceVersion = resourceVersion objWatch, err := o.DynamicClient.Resource(info.Mapping.Resource).Namespace(info.Namespace).Watch(watchOptions)//获取watch对象 if err != nil { return gottenObj, false, err } timeout := endTime.Sub(time.Now()) errWaitTimeoutWithName := extendErrWaitTimeout(wait.ErrWaitTimeout, info) if timeout < 0 {//超时返回错误 // we're out of time return gottenObj, false, errWaitTimeoutWithName } ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), o.Timeout)//获取ctx和cancle watchEvent, err := watchtools.UntilWithoutRetry(ctx, objWatch, w.isConditionMet)//等待直到条件满足 cancel() switch { case err == nil://错误为nil返回true return watchEvent.Object, true, nil case err == watchtools.ErrWatchClosed: continue case err == wait.ErrWaitTimeout: if watchEvent != nil { return watchEvent.Object, false, errWaitTimeoutWithName } return gottenObj, false, errWaitTimeoutWithName default: return gottenObj, false, err } } }
//判断条件是否满足 func (w ConditionalWait) checkCondition(obj *unstructured.Unstructured) (bool, error) { conditions, found, err := unstructured.NestedSlice(obj.Object, "status", "conditions")//获取conditions if err != nil { return false, err } if !found { return false, nil } for _, conditionUncast := range conditions {//遍历conditions condition := conditionUncast.(map[string]interface{})//获取condition name, found, err := unstructured.NestedString(condition, "type")//获取condition名称 if !found || err != nil || strings.ToLower(name) != strings.ToLower(w.conditionName) {//condition名称不是我们要找的名称跳过 continue } status, found, err := unstructured.NestedString(condition, "status")//获取状态 if !found || err != nil { continue } return strings.ToLower(status) == strings.ToLower(w.conditionStatus), nil//判断状态是否是我们指定的状态 } return false, nil } //判断条件是否满足 func (w ConditionalWait) isConditionMet(event watch.Event) (bool, error) { if event.Type == watch.Error {//如果事件类型是错误,打印错误,返回false // keep waiting in the event we see an error - we expect the watch to be closed by // the server err := apierrors.FromObject(event.Object) fmt.Fprintf(w.errOut, "error: An error occurred while waiting for the condition to be satisfied: %v", err) return false, nil } if event.Type == watch.Deleted {//如果事件类型是delete,返回false // this will chain back out, result in another get and an return false back up the chain return false, nil } obj := event.Object.(*unstructured.Unstructured) return w.checkCondition(obj)//检查状态是否满足 }