KubeVela 数据结构:
// ApplicationSpec is the spec of Application
type ApplicationSpec struct {
Components []common.ApplicationComponent `json:"components"`
// Policies defines the global policies for all components in the app, e.g. security, metrics, gitops,
// multi-cluster placement rules, etc.
// Policies are applied after components are rendered and before workflow steps are executed.
Policies []AppPolicy `json:"policies,omitempty"`
// Workflow defines how to customize the control logic.
// If workflow is specified, Vela won't apply any resource, but provide rendered output in AppRevision.
// Workflow steps are executed in array order, and each step:
// - will have a context in annotation.
// - should mark "finish" phase in status.conditions.
Workflow *Workflow `json:"workflow,omitempty"`
// TODO(wonderflow): we should have application level scopes supported here
}
KubeVela 发布的流程:
- cli 发布入口,使用 vela up 命令触发,目的是向k8s 提交appliction cr
- kubevela/references/cmd/cli/main.go,程序入口
- kubevela/references/cli/cli.go
- kubevela/references/cli/up.go,up 命令
// Run execute the vela up command
func (opt *UpCommandOptions) Run(f velacmd.Factory, cmd *cobra.Command) error {
if opt.File != "" {
return opt.deployApplicationFromFile(f, cmd)
}
if opt.RevisionName == "" {
return opt.deployExistingApp(f, cmd)
}
return opt.deployExistingAppUsingRevision(f, cmd)
}
- kubevela/references/common/application.go,以 k8s GVK 格式 apply 应用文件,(Appfile的模式已经被抛弃)
- kubevela/pkg/utils/apply/apply.go,apply 工具类
- 调谐 application
- kubevela/cmd/core/main.go,core组件入口
if useWebhook {
klog.InfoS("Enable webhook", "server port", strconv.Itoa(webhookPort))
oamwebhook.Register(mgr, controllerArgs)
if err := waitWebhookSecretVolume(certDir, waitSecretTimeout, waitSecretInterval); err != nil {
klog.ErrorS(err, "Unable to get webhook secret")
os.Exit(1)
}
}
if err = oamv1alpha2.Setup(mgr, controllerArgs); err != nil { // 处理Application
klog.ErrorS(err, "Unable to setup the oam controller")
os.Exit(1)
}
if err = standardcontroller.Setup(mgr, disableCaps, controllerArgs); err != nil {
klog.ErrorS(err, "Unable to setup the vela core controller")
os.Exit(1)
}
if err = multicluster.InitClusterInfo(restConfig); err != nil {
klog.ErrorS(err, "Init control plane cluster info")
os.Exit(1)
}
if driver := os.Getenv(system.StorageDriverEnv); len(driver) == 0 {
// first use system environment,
err := os.Setenv(system.StorageDriverEnv, storageDriver)
if err != nil {
klog.ErrorS(err, "Unable to setup the vela core controller")
os.Exit(1)
}
}
- kubevela/pkg/controller/core.oam.dev/v1alpha2/setup.go
func Setup(mgr ctrl.Manager, args controller.Args) error {
switch args.OAMSpecVer {
case "all":
for _, setup := range []func(ctrl.Manager, controller.Args) error{
application.Setup, traitdefinition.Setup, componentdefinition.Setup, policydefinition.Setup, workflowstepdefinition.Setup,
applicationconfiguration.Setup,
} {
if err := setup(mgr, args); err != nil {
return err
}
}
- kubevela/pkg/controller/core.oam.dev/v1alpha2/application/application_controller.go
// Reconcile process app event
// nolint:gocyclo
func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
...
//获取Application CR
app := new(v1beta1.Application)
if err := r.Get(ctx, client.ObjectKey{
Name: req.Name,
Namespace: req.Namespace,
}, app);
...
// 获取解析器和处理器
appParser := appfile.NewApplicationParser(r.Client, r.dm, r.pd)
handler, err := NewAppHandler(logCtx, r, app, appParser)
...
// 处理终结器,资源跟踪器是cluster级别的资源因此使用finalizer处理
endReconcile, result, err := r.handleFinalizers(logCtx, app, handler)
...
// 核心逻辑。
// 1、如果指定了发布版本,则获取发布版本的application
// 2、生成appFile空对象
// 3、解析Conponent 生成 workload,将trait、scope 与 workload 关联
// 4、解析 Policy
// 5、解析关联对象(为了解具体场景)
appFile, err := appParser.GenerateAppFile(logCtx, app)
...
app.Status.SetConditions(condition.ReadyCondition("Parsed"))
r.Recorder.Event(app, event.Normal(velatypes.ReasonParsed, velatypes.MessageParsed))
// 准备当前应用版本
if err := handler.PrepareCurrentAppRevision(logCtx, appFile); err != nil {
...
// 应用当前应用版本
if err := handler.FinalizeAndApplyAppRevision(logCtx); err != nil {
...
logCtx.Info("Successfully prepare current app revision", "revisionName", handler.currentAppRev.Name,
"revisionHash", handler.currentRevHash, "isNewRevision", handler.isNewRevision)
app.Status.SetConditions(condition.ReadyCondition("Revision"))
r.Recorder.Event(app, event.Normal(velatypes.ReasonRevisoned, velatypes.MessageRevisioned))
// 更新最新版本的状态
if err := handler.UpdateAppLatestRevisionStatus(logCtx); err != nil {
...
logCtx.Info("Successfully apply application revision")
// 提交策略
if err := handler.ApplyPolicies(logCtx, appFile); err != nil {
...
app.Status.SetConditions(condition.ReadyCondition(common.PolicyCondition.String()))
r.Recorder.Event(app, event.Normal(velatypes.ReasonPolicyGenerated, velatypes.MessagePolicyGenerated))
// 生成工作流步骤
steps, err := handler.GenerateApplicationSteps(logCtx, app, appParser, appFile, handler.currentAppRev)
...
app.Status.SetConditions(condition.ReadyCondition(common.RenderCondition.String()))
r.Recorder.Event(app, event.Normal(velatypes.ReasonRendered, velatypes.MessageRendered))
wf := workflow.NewWorkflow(app, r.Client, appFile.WorkflowMode, appFile.Debug, handler.resourceKeeper)
workflowState, err := wf.ExecuteSteps(logCtx.Fork("workflow"), handler.currentAppRev, steps)
...
handler.addServiceStatus(false, app.Status.Services...)
handler.addAppliedResource(true, app.Status.AppliedResources...)
app.Status.AppliedResources = handler.appliedResources
app.Status.Services = handler.services
switch workflowState {
case common.WorkflowStateInitializing:
...
return r.gcResourceTrackers(logCtx, handler, common.ApplicationRendering, false, false)
case common.WorkflowStateSuspended:
...
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowSuspending, false, true)
case common.WorkflowStateTerminated:
...
return r.gcResourceTrackers(logCtx, handler, common.ApplicationWorkflowTerminated, false, true)
case common.WorkflowStateExecuting:
...
return r.result(err).requeue(wf.GetBackoffWaitTime()).ret()
case common.WorkflowStateSucceeded:
...
case common.WorkflowStateFinished:
...
case common.WorkflowStateSkipping:
...
return ctrl.Result{}, nil
}
var phase = common.ApplicationRunning
if !hasHealthCheckPolicy(appFile.PolicyWorkloads) {
app.Status.Services = handler.services
if !isHealthy(handler.services) {
phase = common.ApplicationUnhealthy
}
}
r.stateKeep(logCtx, handler, app)
if err := garbageCollection(logCtx, handler); err != nil {
logCtx.Error(err, "Failed to run garbage collection")
r.Recorder.Event(app, event.Warning(velatypes.ReasonFailedGC, err))
return r.endWithNegativeCondition(logCtx, app, condition.ReconcileError(err), phase)
}
logCtx.Info("Successfully garbage collect")
app.Status.SetConditions(condition.Condition{
Type: condition.ConditionType(common.ReadyCondition.String()),
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: condition.ReasonReconcileSuccess,
})
r.Recorder.Event(app, event.Normal(velatypes.ReasonDeployed, velatypes.MessageDeployed))
return r.gcResourceTrackers(logCtx, handler, phase, true, true)
}