searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

KubeVela源码小记(一)

2023-12-05 09:10:35
15
0

KubeVela 数据结构:

// ApplicationSpec is the spec of Application
type ApplicationSpec struct {
	Components []common.ApplicationComponent `json:"components"`

	// Policies defines the global policies for all components in the app, e.g. security, metrics, gitops,
	// multi-cluster placement rules, etc.
	// Policies are applied after components are rendered and before workflow steps are executed.
	Policies []AppPolicy `json:"policies,omitempty"`

	// Workflow defines how to customize the control logic.
	// If workflow is specified, Vela won't apply any resource, but provide rendered output in AppRevision.
	// Workflow steps are executed in array order, and each step:
	// - will have a context in annotation.
	// - should mark "finish" phase in status.conditions.
	Workflow *Workflow `json:"workflow,omitempty"`

	// TODO(wonderflow): we should have application level scopes supported here
}

KubeVela 发布的流程:

  1. cli 发布入口,使用 vela up 命令触发,目的是向k8s 提交appliction cr
    1. kubevela/references/cmd/cli/main.go,程序入口
    2. kubevela/references/cli/cli.go
    3. kubevela/references/cli/up.go,up 命令
// Run execute the vela up command
func (opt *UpCommandOptions) Run(f velacmd.Factory, cmd *cobra.Command) error {
	if opt.File != "" {
		return opt.deployApplicationFromFile(f, cmd)
	}
	if opt.RevisionName == "" {
		return opt.deployExistingApp(f, cmd)
	}
	return opt.deployExistingAppUsingRevision(f, cmd)
}
    1. kubevela/references/common/application.go,以 k8s GVK 格式 apply 应用文件,(Appfile的模式已经被抛弃)
    2. kubevela/pkg/utils/apply/apply.go,apply 工具类
  1. 调谐 application
    1. kubevela/cmd/core/main.go,core组件入口
    if useWebhook {
		klog.InfoS("Enable webhook", "server port", strconv.Itoa(webhookPort))
		oamwebhook.Register(mgr, controllerArgs)
		if err := waitWebhookSecretVolume(certDir, waitSecretTimeout, waitSecretInterval); err != nil {
			klog.ErrorS(err, "Unable to get webhook secret")
			os.Exit(1)
		}
	}

	if err = oamv1alpha2.Setup(mgr, controllerArgs); err != nil { // 处理Application
		klog.ErrorS(err, "Unable to setup the oam controller")
		os.Exit(1)
	}

	if err = standardcontroller.Setup(mgr, disableCaps, controllerArgs); err != nil { 
		klog.ErrorS(err, "Unable to setup the vela core controller")
		os.Exit(1)
	}

	if err = multicluster.InitClusterInfo(restConfig); err != nil {
		klog.ErrorS(err, "Init control plane cluster info")
		os.Exit(1)
	}

	if driver := os.Getenv(system.StorageDriverEnv); len(driver) == 0 {
		// first use system environment,
		err := os.Setenv(system.StorageDriverEnv, storageDriver)
		if err != nil {
			klog.ErrorS(err, "Unable to setup the vela core controller")
			os.Exit(1)
		}
	}
    1. kubevela/pkg/controller/core.oam.dev/v1alpha2/setup.go
func Setup(mgr ctrl.Manager, args controller.Args) error {
	switch args.OAMSpecVer {
	case "all":
		for _, setup := range []func(ctrl.Manager, controller.Args) error{
			application.Setup, traitdefinition.Setup, componentdefinition.Setup, policydefinition.Setup, workflowstepdefinition.Setup,
			applicationconfiguration.Setup,
		} {
			if err := setup(mgr, args); err != nil {
				return err
			}
		}
    1. kubevela/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go
// Reconcile process app event
// nolint:gocyclo
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	...
	//获取Application CR
	app := new(v1beta1.Application) 
	if err := r.Get(ctx, client.ObjectKey{
		Name:      req.Name,
		Namespace: req.Namespace,
	}, app); 
    ...

	// 获取解析器和处理器
	appParser := appfile.NewApplicationParser(r.Client, r.dm, r.pd)
	handler, err := NewAppHandler(logCtx, r, app, appParser)
	...
	// 处理终结器,资源跟踪器是cluster级别的资源因此使用finalizer处理
	endReconcile, result, err := r.handleFinalizers(logCtx, app, handler)
	...

	// 核心逻辑。
	// 1、如果指定了发布版本,则获取发布版本的application
	// 2、生成appFile空对象
	// 3、解析Conponent 生成 workload,将trait、scope 与 workload 关联
	// 4、解析 Policy
	// 5、解析关联对象(为了解具体场景)
	appFile, err := appParser.GenerateAppFile(logCtx, app)
	...
	app.Status.SetConditions(condition.ReadyCondition("Parsed"))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonParsed, velatypes.MessageParsed))

	// 准备当前应用版本
	if err := handler.PrepareCurrentAppRevision(logCtx, appFile); err != nil {
	...
	// 应用当前应用版本
	if err := handler.FinalizeAndApplyAppRevision(logCtx); err != nil {
	...
	logCtx.Info("Successfully prepare current app revision", "revisionName", handler.currentAppRev.Name,
		"revisionHash", handler.currentRevHash, "isNewRevision", handler.isNewRevision)
	app.Status.SetConditions(condition.ReadyCondition("Revision"))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonRevisoned, velatypes.MessageRevisioned))

	// 更新最新版本的状态
	if err := handler.UpdateAppLatestRevisionStatus(logCtx); err != nil {
	...
	logCtx.Info("Successfully apply application revision")

	// 提交策略
	if err := handler.ApplyPolicies(logCtx, appFile); err != nil {
	...	
	app.Status.SetConditions(condition.ReadyCondition(common.PolicyCondition.String()))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonPolicyGenerated, velatypes.MessagePolicyGenerated))

	// 生成工作流步骤
	steps, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile, handler.currentAppRev)
	...
	app.Status.SetConditions(condition.ReadyCondition(common.RenderCondition.String()))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonRendered, velatypes.MessageRendered))
	wf := workflow.NewWorkflow(app, r.Client, appFile.WorkflowMode, appFile.Debug, handler.resourceKeeper)
	workflowState, err := wf.ExecuteSteps(logCtx.Fork("workflow"), handler.currentAppRev, steps)
	...

	handler.addServiceStatus(false, app.Status.Services...)
	handler.addAppliedResource(true, app.Status.AppliedResources...)
	app.Status.AppliedResources = handler.appliedResources
	app.Status.Services = handler.services
	switch workflowState {
	case common.WorkflowStateInitializing:
		...
		return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false, false)
	case common.WorkflowStateSuspended:
		...
		return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, true)
	case common.WorkflowStateTerminated:
		...
		return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false, true)
	case common.WorkflowStateExecuting:
		...
		return r.result(err).requeue(wf.GetBackoffWaitTime()).ret()
	case common.WorkflowStateSucceeded:
		...
	case common.WorkflowStateFinished:
		...
	case common.WorkflowStateSkipping:
		...
		return ctrl.Result{}, nil
	}

	var phase = common.ApplicationRunning
	if !hasHealthCheckPolicy(appFile.PolicyWorkloads) {
		app.Status.Services = handler.services
		if !isHealthy(handler.services) {
			phase = common.ApplicationUnhealthy
		}
	}

	r.stateKeep(logCtx, handler, app)
	if err := garbageCollection(logCtx, handler); err != nil {
		logCtx.Error(err, "Failed to run garbage collection")
		r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedGC, err))
		return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), phase)
	}
	logCtx.Info("Successfully garbage collect")
	app.Status.SetConditions(condition.Condition{
		Type:               condition.ConditionType(common.ReadyCondition.String()),
		Status:             corev1.ConditionTrue,
		LastTransitionTime: metav1.Now(),
		Reason:             condition.ReasonReconcileSuccess,
	})
	r.Recorder.Event(app, event.Normal(velatypes.ReasonDeployed, velatypes.MessageDeployed))
	return r.gcResourceTrackers(logCtx, handler, phase, true, true)
}

0条评论
作者已关闭评论
Darren
11文章数
0粉丝数
Darren
11 文章 | 0 粉丝
Darren
11文章数
0粉丝数
Darren
11 文章 | 0 粉丝
原创

KubeVela源码小记(一)

2023-12-05 09:10:35
15
0

KubeVela 数据结构:

// ApplicationSpec is the spec of Application
type ApplicationSpec struct {
	Components []common.ApplicationComponent `json:"components"`

	// Policies defines the global policies for all components in the app, e.g. security, metrics, gitops,
	// multi-cluster placement rules, etc.
	// Policies are applied after components are rendered and before workflow steps are executed.
	Policies []AppPolicy `json:"policies,omitempty"`

	// Workflow defines how to customize the control logic.
	// If workflow is specified, Vela won't apply any resource, but provide rendered output in AppRevision.
	// Workflow steps are executed in array order, and each step:
	// - will have a context in annotation.
	// - should mark "finish" phase in status.conditions.
	Workflow *Workflow `json:"workflow,omitempty"`

	// TODO(wonderflow): we should have application level scopes supported here
}

KubeVela 发布的流程:

  1. cli 发布入口,使用 vela up 命令触发,目的是向k8s 提交appliction cr
    1. kubevela/references/cmd/cli/main.go,程序入口
    2. kubevela/references/cli/cli.go
    3. kubevela/references/cli/up.go,up 命令
// Run execute the vela up command
func (opt *UpCommandOptions) Run(f velacmd.Factory, cmd *cobra.Command) error {
	if opt.File != "" {
		return opt.deployApplicationFromFile(f, cmd)
	}
	if opt.RevisionName == "" {
		return opt.deployExistingApp(f, cmd)
	}
	return opt.deployExistingAppUsingRevision(f, cmd)
}
    1. kubevela/references/common/application.go,以 k8s GVK 格式 apply 应用文件,(Appfile的模式已经被抛弃)
    2. kubevela/pkg/utils/apply/apply.go,apply 工具类
  1. 调谐 application
    1. kubevela/cmd/core/main.go,core组件入口
    if useWebhook {
		klog.InfoS("Enable webhook", "server port", strconv.Itoa(webhookPort))
		oamwebhook.Register(mgr, controllerArgs)
		if err := waitWebhookSecretVolume(certDir, waitSecretTimeout, waitSecretInterval); err != nil {
			klog.ErrorS(err, "Unable to get webhook secret")
			os.Exit(1)
		}
	}

	if err = oamv1alpha2.Setup(mgr, controllerArgs); err != nil { // 处理Application
		klog.ErrorS(err, "Unable to setup the oam controller")
		os.Exit(1)
	}

	if err = standardcontroller.Setup(mgr, disableCaps, controllerArgs); err != nil { 
		klog.ErrorS(err, "Unable to setup the vela core controller")
		os.Exit(1)
	}

	if err = multicluster.InitClusterInfo(restConfig); err != nil {
		klog.ErrorS(err, "Init control plane cluster info")
		os.Exit(1)
	}

	if driver := os.Getenv(system.StorageDriverEnv); len(driver) == 0 {
		// first use system environment,
		err := os.Setenv(system.StorageDriverEnv, storageDriver)
		if err != nil {
			klog.ErrorS(err, "Unable to setup the vela core controller")
			os.Exit(1)
		}
	}
    1. kubevela/pkg/controller/core.oam.dev/v1alpha2/setup.go
func Setup(mgr ctrl.Manager, args controller.Args) error {
	switch args.OAMSpecVer {
	case "all":
		for _, setup := range []func(ctrl.Manager, controller.Args) error{
			application.Setup, traitdefinition.Setup, componentdefinition.Setup, policydefinition.Setup, workflowstepdefinition.Setup,
			applicationconfiguration.Setup,
		} {
			if err := setup(mgr, args); err != nil {
				return err
			}
		}
    1. kubevela/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go
// Reconcile process app event
// nolint:gocyclo
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	...
	//获取Application CR
	app := new(v1beta1.Application) 
	if err := r.Get(ctx, client.ObjectKey{
		Name:      req.Name,
		Namespace: req.Namespace,
	}, app); 
    ...

	// 获取解析器和处理器
	appParser := appfile.NewApplicationParser(r.Client, r.dm, r.pd)
	handler, err := NewAppHandler(logCtx, r, app, appParser)
	...
	// 处理终结器,资源跟踪器是cluster级别的资源因此使用finalizer处理
	endReconcile, result, err := r.handleFinalizers(logCtx, app, handler)
	...

	// 核心逻辑。
	// 1、如果指定了发布版本,则获取发布版本的application
	// 2、生成appFile空对象
	// 3、解析Conponent 生成 workload,将trait、scope 与 workload 关联
	// 4、解析 Policy
	// 5、解析关联对象(为了解具体场景)
	appFile, err := appParser.GenerateAppFile(logCtx, app)
	...
	app.Status.SetConditions(condition.ReadyCondition("Parsed"))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonParsed, velatypes.MessageParsed))

	// 准备当前应用版本
	if err := handler.PrepareCurrentAppRevision(logCtx, appFile); err != nil {
	...
	// 应用当前应用版本
	if err := handler.FinalizeAndApplyAppRevision(logCtx); err != nil {
	...
	logCtx.Info("Successfully prepare current app revision", "revisionName", handler.currentAppRev.Name,
		"revisionHash", handler.currentRevHash, "isNewRevision", handler.isNewRevision)
	app.Status.SetConditions(condition.ReadyCondition("Revision"))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonRevisoned, velatypes.MessageRevisioned))

	// 更新最新版本的状态
	if err := handler.UpdateAppLatestRevisionStatus(logCtx); err != nil {
	...
	logCtx.Info("Successfully apply application revision")

	// 提交策略
	if err := handler.ApplyPolicies(logCtx, appFile); err != nil {
	...	
	app.Status.SetConditions(condition.ReadyCondition(common.PolicyCondition.String()))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonPolicyGenerated, velatypes.MessagePolicyGenerated))

	// 生成工作流步骤
	steps, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile, handler.currentAppRev)
	...
	app.Status.SetConditions(condition.ReadyCondition(common.RenderCondition.String()))
	r.Recorder.Event(app, event.Normal(velatypes.ReasonRendered, velatypes.MessageRendered))
	wf := workflow.NewWorkflow(app, r.Client, appFile.WorkflowMode, appFile.Debug, handler.resourceKeeper)
	workflowState, err := wf.ExecuteSteps(logCtx.Fork("workflow"), handler.currentAppRev, steps)
	...

	handler.addServiceStatus(false, app.Status.Services...)
	handler.addAppliedResource(true, app.Status.AppliedResources...)
	app.Status.AppliedResources = handler.appliedResources
	app.Status.Services = handler.services
	switch workflowState {
	case common.WorkflowStateInitializing:
		...
		return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false, false)
	case common.WorkflowStateSuspended:
		...
		return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, true)
	case common.WorkflowStateTerminated:
		...
		return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false, true)
	case common.WorkflowStateExecuting:
		...
		return r.result(err).requeue(wf.GetBackoffWaitTime()).ret()
	case common.WorkflowStateSucceeded:
		...
	case common.WorkflowStateFinished:
		...
	case common.WorkflowStateSkipping:
		...
		return ctrl.Result{}, nil
	}

	var phase = common.ApplicationRunning
	if !hasHealthCheckPolicy(appFile.PolicyWorkloads) {
		app.Status.Services = handler.services
		if !isHealthy(handler.services) {
			phase = common.ApplicationUnhealthy
		}
	}

	r.stateKeep(logCtx, handler, app)
	if err := garbageCollection(logCtx, handler); err != nil {
		logCtx.Error(err, "Failed to run garbage collection")
		r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedGC, err))
		return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), phase)
	}
	logCtx.Info("Successfully garbage collect")
	app.Status.SetConditions(condition.Condition{
		Type:               condition.ConditionType(common.ReadyCondition.String()),
		Status:             corev1.ConditionTrue,
		LastTransitionTime: metav1.Now(),
		Reason:             condition.ReasonReconcileSuccess,
	})
	r.Recorder.Event(app, event.Normal(velatypes.ReasonDeployed, velatypes.MessageDeployed))
	return r.gcResourceTrackers(logCtx, handler, phase, true, true)
}

文章来自个人专栏
Darren的容器专栏
11 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0