searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kubesphere核心组件和实现原理分析

2023-04-25 12:41:50
471
0

Kubesphere是国内开源的容器云平台,相比Openshift、Rancher等业界优秀产品,Kubesphere功能丰富度上也不相上下。Kubesphere的技术架构也是基于k8s可编程框架来实现的,本文主要介绍KS的核心组件和核心的机制实现原理,主要包括APIServer、ControllerManager、请求处理过程和认证和授权四个部分,基于KS 3.1.1源码版本。通过研究Kubesphere来理解容器云产品的技术架构和实现有一定参考意义。

一、Kubesphere核心组件介绍

1、整体架构

KubeSphere 官方的核心架构如图所示

核心组件主要有三个:

  • ks-console 前端服务组件

  • ks-apiserver 后端服务组件

  • ks-controller-manager 资源状态维护组件

在安装好的KS的服务组件里面也可以看到核心的服务组件

除了上面三个组件,还多了openIdap,用于存储用户账号,minio存储非结构化数据,如Helm charts。

KubeSphere 的后端设计中沿用了 K8s 声明式 API 的风格,所有可操作的资源都尽可能的抽象成为 CustomResource。所以要熟悉KubeSphere的代码实现,

首先需要熟悉K8S的声明式API的核心APIServer/ControllerManager/Client-go的实现。

2、KS源码目录与功能

二、Kubesphere APIServer实现分析

ks-apiserver 的开发使用了 go-restful 框架,可以在请求链路中增加多个 Filter 用于动态的拦截请求和响应,实现认证、鉴权、审计逻辑转发和反向代理功能。可以认为,ks-apiserver实现了类似微服务网关的功能。

1、整体架构分析

 

从架构图上可以看出,KS APIServer对不同类型资源对象的API作了不同的分发和路由处理:

  • kubernetes原生的对象,由ks-apiserver连接api-server,直接获取更改etcd中kubernetes的原始数据(origin data)即可,操作的对象即kubernetes原生的configmap. deployment等对象。

  • KS的CRD对象,ks-controller-manager的封装功能逻辑以crd对象的方式表现在etcd中,ks-apiserver通过连接k8s-apiserver操作etcd中的crd数据(crd data)即可,操作 ks-controller-manager 扩展的逻辑功能。

  • 第三方的operator对象,如prometheus-operator等第三方完成的模块以operator的方式运行在系统中,其功能对应的对象也以crd的形式存放载etcd中,ks-apiserver也是通过和k8s-apiserver交互操作对应的crd完成。

  • 普通的服务对象,如kenkins,sonarqube等以普通服务的方式运行在系统中,ks-apiserver直接通过网络调用和此类对象交互

  • 多集群模式的API对象,通过tower分发到Member集群处理,如阿里云,腾讯云K8S集群。 以上,ks-apiserver通过内部API(inner API aggregate)聚合内部功能 ,并对外提供统一的API,即外部API(out API aggregate)。

 

2、代码实现分析

ks-apiserver整体启动流程如下图,提供Restful API和k8s一样主要通过go-restful这个轻量级框架实现,

 

代码入口:1.cmd/ks-apiserver/apiserver.go

//通过cobra库构建启动CMD并启动
cmd:=app.NewAPIServerCommand()
cmd.Execute()

2.->cmd/ks-apiserver/app/server.go

func NewAPIServerCommand() *cobra.Command
s := options.NewServerRunOptions()
//通过viper库读取kubesphere配置,从指定目录路径或环境变量
conf, err := apiserverconfig.TryLoadFromDisk()
//创建cobra cmd,在cmd中run
func Run(s *options.ServerRunOptions, ctx context.Context) error {
//通过Options创建一个APIServer
apiserver, err := s.NewAPIServer(ctx.Done())
if err != nil {
return err
}
//API注册,处理链
err = apiserver.PrepareRun(ctx.Done())
if err != nil {
return nil
}
//运行APIServer
return apiserver.Run(ctx)
}

3.->cmd/ks-apiserver/app/options/options.go 通过给定的options创建ks-apiserver实例 创建k8s的客户端,并把kubernetesClient(Kubernetes、KubeSphere、Istio、ApiExtensions、Prometheus) 可以通过informers管理的CRD,外部的组件通过http等公共协议远程调用管理部分通过服务接口直接提供的资源 ,如 MetricsClient、LoggingClient、S3Client、devopsClient、SonarClient、eventsclient CacheClient auditingclient、alertingClient 实例化后挂在APIServer上

func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error){
...
informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),
kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
apiServer.InformerFactory = informerFactory
...
}

4.->pkg/apiserver/server.go 定义APIServer,注册API,创建处理链(审计、授权)

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
​
s.container = restful.NewContainer()
s.container.Filter(logRequestAndResponse)
s.container.Router(restful.CurlyRouter{})
s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(panicReason, httpWriter)
})
//注册ks 的API
s.installKubeSphereAPIs()
//注册计量API
s.installMetricsAPI()
s.container.Filter(monitorRequest)

for _, ws := range s.container.RegisteredWebServices() {
klog.V(2).Infof("%s", ws.RootPath())
}

s.Server.Handler = s.container
//处理链
s.buildHandlerChain(stopCh)

return nil
​
}

5.->pkg/apiserver/server.go 接口注册,添加到go-restful的container并与授权认证关联

func (s *APIServer) installKubeSphereAPIs() {
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
...
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
....
urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
​
}

5.->pkg/apiserver/server.go 接口注册,添加到go-restful的container并与授权认证关联,这里面如果为devops已经独立的外部服务,则通过代理方式实现注册转发。

func (s *APIServer) installKubeSphereAPIs() {
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)

​
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
....
urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
​
}
​
//pkg/kapis/config/v1alpha2/register.go
//直接注册
func AddToContainer(c *restful.Container, config *kubesphereconfig.Config) error {
webservice := runtime.NewWebService(GroupVersion)
​
webservice.Route(webservice.GET("/configs/oauth").
Doc("Information about the authorization server are published.").
To(func(request *restful.Request, response *restful.Response) {
response.WriteEntity(config.AuthenticationOptions.OAuthOptions)
}))

webservice.Route(webservice.GET("/configs/configz").
Doc("Information about the server configuration").
To(func(request *restful.Request, response *restful.Response) {
response.WriteAsJson(config.ToMap())
}))

c.Add(webservice)
return nil
​
}
//下面是代理方式的注册处理过程 ,devops之类的转发到ks-devops服务
func AddToContainer(container *restful.Container, endpoint string) error {
proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version)
if err != nil {
return err
}
​
return proxy.AddToContainer(container)
​
}
​
//pkg/kapis/generic/generic.go
func NewGenericProxy(endpoint string, groupName string, version string) (*genericProxy, error) {
parse, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
​
// trim path suffix slash
parse.Path = strings.Trim(parse.Path, "/")

return &genericProxy{
Endpoint:  parse,
GroupName: groupName,
Version:   version,
}, nil
​
}
//添加到go-restful container,支持GET/PUT/POST/DELETE/PATCH
func (g *genericProxy) AddToContainer(container *restful.Container) error {
webservice := runtime.NewWebService(schema.GroupVersion{
Group:   g.GroupName,
Version: g.Version,
})

​
webservice.Route(webservice.GET("/{path:*}").
To(g.handler).
Returns(http.StatusOK, api.StatusOK, nil))
...
container.Add(webservice)
return nil
​
}
​
//最终实现请求转发到对应的endpoint
func (g *genericProxy) handler(request *restful.Request, response *restful.Response) {
u := g.makeURL(request)
​
httpProxy := proxy.NewUpgradeAwareHandler(u, http.DefaultTransport, false, false, &errorResponder{})
httpProxy.ServeHTTP(response, request.Request)
​
}

6.->pkg/apiserver/server.go //创建过滤处理链,处理审计、授权和认证

func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
requestInfoResolver := &request.RequestInfoFactory{
APIPrefixes:          sets.NewString("api", "apis", "kapis", "kapi"),
GrouplessAPIPrefixes: sets.NewString("api", "kapi"),
GlobalResources: []schema.GroupResource{
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralUser),
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralGlobalRole),
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralGlobalRoleBinding),
tenantv1alpha1.Resource(tenantv1alpha1.ResourcePluralWorkspace),
tenantv1alpha2.Resource(tenantv1alpha1.ResourcePluralWorkspace),
tenantv1alpha2.Resource(clusterv1alpha1.ResourcesPluralCluster),
clusterv1alpha1.Resource(clusterv1alpha1.ResourcesPluralCluster),
resourcev1alpha3.Resource(clusterv1alpha1.ResourcesPluralCluster),
notificationv2beta1.Resource(v2beta1.ResourcesPluralConfig),
notificationv2beta1.Resource(v2beta1.ResourcesPluralReceiver),
},
}
​
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})

//开启审计
if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler,
audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
}

var authorizers authorizer.Authorizer
//授权
switch s.Config.AuthorizationOptions.Mode {
case authorizationoptions.AlwaysAllow:
authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()
case authorizationoptions.AlwaysDeny:
authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()
default:
fallthrough
case authorizationoptions.RBAC:
excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}
pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
amOperator := am.NewReadOnlyOperator(s.InformerFactory)
authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))
}

handler = filters.WithAuthorization(handler, authorizers)
//多集群模式
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}

loginRecorder := auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())
// authenticators are unordered
//和k8s一样支持多种认证模式
authn := unionauth.New(anonymous.NewAuthenticator(),
basictoken.New(basic.NewBasicAuthenticator(auth.NewPasswordAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions), loginRecorder)),
bearertoken.New(jwttoken.NewTokenAuthenticator(auth.NewTokenOperator(s.CacheClient,
s.Config.AuthenticationOptions),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())))
handler = filters.WithAuthentication(handler, authn)
handler = filters.WithRequestInfo(handler, requestInfoResolver)

s.Server.Handler = handler
​
}

7.->pkg/apiserver/server.go

//运行APIServer
func (s *APIServer) Run(ctx context.Context) (err error) {
​
//等待资源同步
err = s.waitForResourceSync(ctx)
if err != nil {
return err
}

shutdownCtx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
<-ctx.Done()
_ = s.Server.Shutdown(shutdownCtx)
}()
//启动http/https服务
klog.V(0).Infof("Start listening on %s", s.Server.Addr)
if s.Server.TLSConfig != nil {
err = s.Server.ListenAndServeTLS("", "")
} else {
err = s.Server.ListenAndServe()
}

return err
​
}
​
//连接k8s的API Server,完成GVR资源同步
func (s *APIServer) waitForResourceSync(ctx context.Context) error {
klog.V(0).Info("Start cache objects")
​
stopCh := ctx.Done()

//服务发现
discoveryClient := s.KubernetesClient.Kubernetes().Discovery()
_, apiResourcesList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
return err
}

isResourceExists := func(resource schema.GroupVersionResource) bool {
for _, apiResource := range apiResourcesList {
if apiResource.GroupVersion == resource.GroupVersion().String() {
for _, rsc := range apiResource.APIResources {
if rsc.Name == resource.Resource {
return true
}
}
}
}
return false
}

// resources we have to create informer first
//原生K8S资源
k8sGVRs := []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "namespaces"},
{Group: "", Version: "v1", Resource: "nodes"},
{Group: "", Version: "v1", Resource: "resourcequotas"},
{Group: "", Version: "v1", Resource: "pods"},
{Group: "", Version: "v1", Resource: "services"},
{Group: "", Version: "v1", Resource: "persistentvolumeclaims"},
{Group: "", Version: "v1", Resource: "secrets"},
{Group: "", Version: "v1", Resource: "configmaps"},
{Group: "", Version: "v1", Resource: "serviceaccounts"},
.....
}
...
//K8S资源对象的formerFactory启动
s.InformerFactory.KubernetesSharedInformerFactory().Start(stopCh)
s.InformerFactory.KubernetesSharedInformerFactory().WaitForCacheSync(stopCh)

ksInformerFactory := s.InformerFactory.KubeSphereSharedInformerFactory()

//ks资源
ksGVRs := []schema.GroupVersionResource{
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"},
{Group: "tenant.kubesphere.io", Version: "v1alpha2", Resource: "workspacetemplates"},
...
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "loginrecords"},
{Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"},
{Group: "devops.kubesphere.io", Version: "v1alpha3", Resource: "devopsprojects"},
}

//
devopsGVRs := []schema.GroupVersionResource{
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibinaries"},
...
}

servicemeshGVRs := []schema.GroupVersionResource{
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "strategies"},
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "servicepolicies"},
}

// federated resources on cached in multi cluster setup
//多集群联邦资源
federatedResourceGVRs := []schema.GroupVersionResource{
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRole),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRoleBindingBinding),
...
}

ksInformerFactory.Start(stopCh)
ksInformerFactory.WaitForCacheSync(stopCh)
....
//prometheus
if promFactory := s.InformerFactory.PrometheusSharedInformerFactory(); promFactory != nil {
prometheusGVRs := []schema.GroupVersionResource{
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheuses"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheusrules"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "thanosrulers"},
}
for _, gvr := range prometheusGVRs {
if isResourceExists(gvr) {
_, err = promFactory.ForResource(gvr)
if err != nil {
return err
}
} else {
klog.Warningf("resource %s not exists in the cluster", gvr)
}
}
promFactory.Start(stopCh)
promFactory.WaitForCacheSync(stopCh)
}

// controller runtime cache for resources
go s.RuntimeCache.Start(ctx)
s.RuntimeCache.WaitForCacheSync(ctx)
...
return nil
​
}

至此,APIServer相关流程处理完成

 

三、KubeSphere Controller-manager实现分析

ks-controller-manager主要是kubesphere的基本系统功能集合,并将其提供的功能抽象为CRD注册在kubernetes中对ks-apiserver提供使用入口。而监控告警等第三方不断扩展的组件也采用同样的方式部署在kubernetes上,并由ks-apiserver最终统一聚合。做到功能模块分离微服务化,交互入口中心统一化。

1、KS Controller-manager启动流程分析

2、代码实现分析

1.代码入口:cmd/controller-manager/controller-manager.go

//和APIServer类似,通过cobra库构建启动CMD并启动
command := app.NewControllerManagerCommand()
command.Execute()

 

2.->cmd/controller-manager/app/options/options.go

//聚合各类组件的Options
func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions {
s := &KubeSphereControllerManagerOptions{
KubernetesOptions:     k8s.NewKubernetesOptions(),
DevopsOptions:         jenkins.NewDevopsOptions(),
S3Options:             s3.NewS3Options(),
LdapOptions:           ldapclient.NewOptions(),
OpenPitrixOptions:     openpitrix.NewOptions(),
NetworkOptions:        network.NewNetworkOptions(),
MultiClusterOptions:   multicluster.NewOptions(),
ServiceMeshOptions:    servicemesh.NewServiceMeshOptions(),
AuthenticationOptions: authoptions.NewAuthenticateOptions(),
LeaderElection: &leaderelection.LeaderElectionConfig{
LeaseDuration: 30 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod:   5 * time.Second,
},
LeaderElect:         false,
WebhookCertDir:      "",
ApplicationSelector: "",
}
​
return s
​
}

3.->cmd/controller-manager/app/server.go

func NewControllerManagerCommand() *cobra.Command {
// options
s := options.NewKubeSphereControllerManagerOptions()
//通过viper库读取配置
conf, err := controllerconfig.TryLoadFromDisk()
if err == nil {
// make sure LeaderElection is not nil
s = &options.KubeSphereControllerManagerOptions{
KubernetesOptions:     conf.KubernetesOptions,
DevopsOptions:         conf.DevopsOptions,
S3Options:             conf.S3Options,
AuthenticationOptions: conf.AuthenticationOptions,
LdapOptions:           conf.LdapOptions,
OpenPitrixOptions:     conf.OpenPitrixOptions,
NetworkOptions:        conf.NetworkOptions,
MultiClusterOptions:   conf.MultiClusterOptions,
ServiceMeshOptions:    conf.ServiceMeshOptions,
LeaderElection:        s.LeaderElection,
LeaderElect:           s.LeaderElect,
WebhookCertDir:        s.WebhookCertDir,
}
} else {
klog.Fatal("Failed to load configuration from disk", err)
}
​
cmd := &cobra.Command{
Use:  "controller-manager",
Long: `KubeSphere controller manager is a daemon that`,
Run: func(cmd *cobra.Command, args []string) {
if errs := s.Validate(); len(errs) != 0 {
klog.Error(utilerrors.NewAggregate(errs))
os.Exit(1)
}

if err = run(s, signals.SetupSignalHandler()); err != nil {
klog.Error(err)
os.Exit(1)
}
},
SilenceUsage: true,
}
​
//这里使用client-go中的controller-runtime来辅助管理controller
//controller-manager作为一个有状态的组件,需要进行选主。APIServer是无状态的,不存在选主过程
func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) error {
​
//创建K8S客户端
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)

//加载各类客户端
//devopsClient/ldapClient/s3Client....
var devopsClient devops.Interface
if s.DevopsOptions != nil && len(s.DevopsOptions.Host) != 0 {
devopsClient, err = jenkins.NewDevopsClient(s.DevopsOptions)
if err != nil {
return fmt.Errorf("failed to connect jenkins, please check jenkins status, error: %v", err)
}
}
....
//manager的选主配置
if s.LeaderElect {
mgrOptions = manager.Options{
CertDir:                 s.WebhookCertDir,
Port:                    8443,
LeaderElection:          s.LeaderElect,
LeaderElectionNamespace: "kubesphere-system",
LeaderElectionID:        "ks-controller-manager-leader-election",
LeaseDuration:           &s.LeaderElection.LeaseDuration,
RetryPeriod:             &s.LeaderElection.RetryPeriod,
RenewDeadline:           &s.LeaderElection.RenewDeadline,
}
}
//创建manager
mgr, err := manager.New(kubernetesClient.Config(), mgrOptions)
informerFactory := informers.NewInformerFactories(
kubernetesClient.Kubernetes(),
kubernetesClient.KubeSphere(),
kubernetesClient.Istio(),
kubernetesClient.Snapshot(),
kubernetesClient.ApiExtensions(),
kubernetesClient.Prometheus())
//关联pkg/controller/workspacetemplate/workspacetemplate_controller.go
workspaceTemplateReconciler := &workspacetemplate.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable}
if err = workspaceTemplateReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create workspace template controller: %v", err)
}
//继续实例化各类controller
...
//注册webhook,通过K8S的Adminssion机制
hookServer := mgr.GetWebhookServer()

klog.V(2).Info("registering webhooks to the webhook server")
hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}})
hookServer.Register("/validate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.ValidatingHandler{C: mgr.GetClient()}})
hookServer.Register("/mutate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.MutatingHandler{C: mgr.GetClient()}})
...
//启动controller,进入controller Loop
if err = mgr.Start(ctx); err != nil {
klog.Fatalf("unable to run the manager: %v", err)
}
​
}

至此,controller-manager组件处理完成。

 

四、KubeSphere API请求处理流程分析

KS的API请求处理流程与一般的Restful Web框架处理差不多,比如SpringMVC,KS这里用的Restful框架和k8s一样是go-restful。区别主要在于informer二级缓存机制,对于资源对象的读通过informer执行,减少对K8S APIServer的请求,从而降低对Etcd的压力。

1、整体处理流程

2、代码分析

1.->pkg/kapis/tenant/v1alpha2/register.go

//在APIServer启动时注册,根据资源对象依赖的组件不同,参数不一样
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface,
ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Client,
auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer,
monitoringclient monitoringclient.Interface, cache cache.Cache, meteringOptions *meteringclient.Options) error {
mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson}
​
ws := runtime.NewWebService(GroupVersion)
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions)

ws.Route(ws.GET("/clusters").
To(handler.ListClusters).
Doc("List clusters available to users").
Returns(http.StatusOK, api.StatusOK, api.ListResult{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.UserResourceTag}))
...
​
}

2.->pkg/kapis/tenant/v1alpha2/handler.go

//关联Handler
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface,
evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client,
am am.AccessManagementInterface, authorizer authorizer.Authorizer,
monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter,
meteringOptions *meteringclient.Options) *tenantHandler {
​
if meteringOptions == nil || meteringOptions.RetentionDay == "" {
meteringOptions = &meteringclient.DefaultMeteringOption
}

return &tenantHandler{
tenant:          tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter),
meteringOptions: meteringOptions,
}
​
}

3.->读数据

//handler.ListClusters

func (h *tenantHandler) ListClusters(r *restful.Request, response *restful.Response) {
    user, ok := request.UserFrom(r.Request.Context())
​
    if !ok {
        response.WriteEntity([]interface{}{})
        return
    }
    
    result, err := h.tenant.ListClusters(user)
    
    if err != nil {
        klog.Error(err)
        if errors.IsNotFound(err) {
            api.HandleNotFound(response, r, err)
            return
        }
        api.HandleInternalError(response, r, err)
        return
    }
    
    response.WriteEntity(result)
​
}
​
//调用 models中的函数h.tenant.ListClusters(user)
//pkg/models/tenant/tenant.go
//集群与命名空间资源因为租户关系 比较复杂
type tenantOperator struct {
    am             am.AccessManagementInterface
    authorizer     authorizer.Authorizer
    k8sclient      kubernetes.Interface
    ksclient       kubesphere.Interface
    resourceGetter *resourcesv1alpha3.ResourceGetter
    events         events.Interface
    lo             logging.LoggingOperator
    auditing       auditing.Interface
    mo             monitoring.MonitoringOperator
    opRelease      openpitrix.ReleaseInterface
}
​
func (t *tenantOperator) ListClusters(user user.Info) (*api.ListResult, error) {
​
    listClustersInGlobalScope := authorizer.AttributesRecord{
        User:            user,
        Verb:            "list",
        Resource:        "clusters",
        ResourceScope:   request.GlobalScope,
        ResourceRequest: true,
    }
    
    //授权检查
    allowedListClustersInGlobalScope, _, err := t.authorizer.Authorize(listClustersInGlobalScope)
    
    if err != nil {
        klog.Error(err)
        return nil, err
    }
    ...
    
    //允许集群和工作空间
    if allowedListClustersInGlobalScope == authorizer.DecisionAllow ||
        allowedListWorkspacesInGlobalScope == authorizer.DecisionAllow {
        //查找资源
        result, err := t.resourceGetter.List(clusterv1alpha1.ResourcesPluralCluster, "", query.New())
        if err != nil {
            klog.Error(err)
            return nil, err
        }
        return result, nil
    }
    //查询角色
    workspaceRoleBindings, err := t.am.ListWorkspaceRoleBindings(user.GetName(), user.GetGroups(), "")
    
    if err != nil {
        klog.Error(err)
        return nil, err
    }
    
    clusters := map[string]*clusterv1alpha1.Cluster{}
    //企业空间的角色绑定,过滤数据
    for _, roleBinding := range workspaceRoleBindings {
        workspaceName := roleBinding.Labels[tenantv1alpha1.WorkspaceLabel]
        workspace, err := t.DescribeWorkspace(workspaceName)
        if err != nil {
            klog.Error(err)
            return nil, err
        }
    
        for _, grantedCluster := range workspace.Spec.Placement.Clusters {
            // skip if cluster exist
            if clusters[grantedCluster.Name] != nil {
                continue
            }
            obj, err := t.resourceGetter.Get(clusterv1alpha1.ResourcesPluralCluster, "", grantedCluster.Name)
            if err != nil {
                klog.Error(err)
                if errors.IsNotFound(err) {
                    continue
                }
                return nil, err
            }
            cluster := obj.(*clusterv1alpha1.Cluster)
            clusters[cluster.Name] = cluster
        }
    }
    
    items := make([]interface{}, 0)
    for _, cluster := range clusters {
        items = append(items, cluster)
    }
    
    return &api.ListResult{Items: items, TotalItems: len(items)}, nil
​
}


最终通过informers/Lister来获取

4.->写数据

func (h *tenantHandler) CreateWorkspace(request *restful.Request, response *restful.Response) {
var workspace tenantv1alpha2.WorkspaceTemplate
​
err := request.ReadEntity(&workspace)

if err != nil {
klog.Error(err)
api.HandleBadRequest(response, request, err)
return
}

created, err := h.tenant.CreateWorkspace(&workspace)

if err != nil {
klog.Error(err)
if errors.IsNotFound(err) {
api.HandleNotFound(response, request, err)
return
}
api.HandleBadRequest(response, request, err)
return
}

response.WriteEntity(created)
​
}
​
pkg/models/tenant/tenant.go
//直接调用client写数据
func (t *tenantOperator) CreateWorkspace(workspace *tenantv1alpha2.WorkspaceTemplate) (*tenantv1alpha2.WorkspaceTemplate, error) {
return t.ksclient.TenantV1alpha2().WorkspaceTemplates().Create(context.Background(), workspace, metav1.CreateOptions{})
}

3、Controller Run

 

五、Kubesphere认证与授权体系

KS的认证与授权体系实现与k8s类似,通过APIServer中进行认证和授权,用户、角色和授权相关信息统一通过CRD与k8s交互,保存在Etcd中。

关键的对象有: Role、User、Group、RoleBinding

角色是根据资源层级进行划分的,cluster role、workspace role、namespace role 不同层级的角色定义了该角色在当前层级可以访问的资源。

KubeSphere 权限控制的核心是 RBAC 基于角色的访问控制

1、整体架构

KS APIServer在过滤处理链中有三个与认证授权相关的组件。

  • Authentication:认证,支持基于用户名密码的BasicAuthen和jwtToken的TokenAuthen

  • Authorization:授权,支持特殊Path的PathAuthor和RBAC的RBACAuthor

  • Adminssion:准入控制器,在认证和授权后进行准入限制,比如webhook。

在Models中定义了三个相关的Operator,对用户、角色和用户组进行数据操作

  • IMOperator:定义了用户信息管理的接口和实现

  • AMOperator:定义了角色与角色绑定的接口与实现,涉及cluster/workspace/namespace/devops等角色

  • GroupOperator:定义了分组的接口与实现

在KS ControllerManager中,Controller主要处理user/role/rolebinding等Crd对象的状态更新。比较特殊的是UserController,

它需要处理Etcd和OpenLdap数据同步。

OpenLdap主要存储用户账号相关信息。

2、代码实现分析

1.->models/iam/im/im.go中定义了用户账号的管理方法接口和实现

type IdentityManagementInterface interface {
    CreateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
    ListUsers(query *query.Query) (*api.ListResult, error)
    DeleteUser(username string) error
    UpdateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
    DescribeUser(username string) (*iamv1alpha2.User, error)
    ModifyPassword(username string, password string) error
    ListLoginRecords(username string, query *query.Query) (*api.ListResult, error)
    PasswordVerify(username string, password string) error
}
​
type imOperator struct {
    ksClient          kubesphere.Interface
    userGetter        resources.Interface
    loginRecordGetter resources.Interface
    options           *authoptions.AuthenticationOptions
}

models/iam/am/am.go中定义了KS中所有的角色和角色绑定的函数

type AccessManagementInterface interface {
    GetGlobalRoleOfUser(username string) (*iamv1alpha2.GlobalRole, error)
    GetWorkspaceRoleOfUser(username string, groups []string, workspace string) ([]*iamv1alpha2.WorkspaceRole, error)
    GetClusterRoleOfUser(username string) (*rbacv1.ClusterRole, error)
    GetNamespaceRoleOfUser(username string, groups []string, namespace string) ([]*rbacv1.Role, error)
    ListRoles(namespace string, query *query.Query) (*api.ListResult, error)
    CreateGlobalRoleBinding(username string, globalRole string) error
    CreateOrUpdateWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
    PatchWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
    CreateOrUpdateGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
    PatchGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
    DeleteWorkspaceRole(workspace string, name string) error
    DeleteGlobalRole(name string) error
    ...
}
type amOperator struct {
    globalRoleBindingGetter    resourcev1alpha3.Interface
    workspaceRoleBindingGetter resourcev1alpha3.Interface
    clusterRoleBindingGetter   resourcev1alpha3.Interface
    roleBindingGetter          resourcev1alpha3.Interface
    globalRoleGetter           resourcev1alpha3.Interface
    workspaceRoleGetter        resourcev1alpha3.Interface
    clusterRoleGetter          resourcev1alpha3.Interface
    roleGetter                 resourcev1alpha3.Interface
    devopsProjectLister        devopslisters.DevOpsProjectLister
    namespaceLister            listersv1.NamespaceLister
    ksclient                   kubesphere.Interface
    k8sclient                  kubernetes.Interface
}
 

2.->pkg/apiserver/server.go

//注册与Authen和author相关的API
func (s *APIServer) installKubeSphereAPIs() {
//创建用户、角色和授权相关的资源操作对象
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)

​
urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
rbacAuthorizer))
//注册oauth相关API
urlruntime.Must(oauth.AddToContainer(s.container, imOperator,
auth.NewTokenOperator(
s.CacheClient,
s.Config.AuthenticationOptions),
auth.NewPasswordAuthenticator(
s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions),
auth.NewOAuthAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory(),
s.Config.AuthenticationOptions),
auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister()),
s.Config.AuthenticationOptions))
//注册租户(workspace)相关API
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
//
urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
rbacAuthorizer))
​
}

3.->pkg/apiserver/server.go //处理链中添加认证授权和准入相关的处理器

func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
​
var authorizers authorizer.Authorizer
//根据配置实例化授权器
switch s.Config.AuthorizationOptions.Mode {
case authorizationoptions.AlwaysAllow:
authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()
case authorizationoptions.AlwaysDeny:
authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()
default:
fallthrough
case authorizationoptions.RBAC://一般采用该模式
//排除以下API不进行授权验证
excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}
pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
amOperator := am.NewReadOnlyOperator(s.InformerFactory)
//联合,放一个数组里面
authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))
}
//授权验证
handler = filters.WithAuthorization(handler, authorizers)
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}
//创建登录记录器,记录登录信息
loginRecorder := auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())
// authenticators are unordered
//把支持的登录验证器加到数组
//BasicAuthenticator,用于用户名密码验证,
//TokenAuthenticator, jwtToken验证
authn := unionauth.New(anonymous.NewAuthenticator(),
basictoken.New(basic.NewBasicAuthenticator(auth.NewPasswordAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions), loginRecorder)),
bearertoken.New(jwttoken.NewTokenAuthenticator(auth.NewTokenOperator(s.CacheClient,
s.Config.AuthenticationOptions),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())))
//加入到过滤链中
handler = filters.WithAuthentication(handler, authn)
​
​
}

 

 

4.->pkg/controller/user/user_controller.go reconcile中实现了Etcd中用户信息变化后,controller处理相关的所有逻辑,比如用户删除、用户修改密码、同步OpenLdap等

func (c *userController) reconcile(key string) error {
// Get the user with this name
user, err := c.userLister.Get(key)
if err != nil {
// The user may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("user '%s' in work queue no longer exists", key))
return nil
}
klog.Error(err)
return err
}
//用户没有删除,加入Finalizers,更新用户状态
if user.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
if !sliceutil.HasString(user.Finalizers, finalizer) {
user.ObjectMeta.Finalizers = append(user.ObjectMeta.Finalizers, finalizer)
if user, err = c.ksClient.IamV1alpha2().Users().Update(context.Background(), user, metav1.UpdateOptions{}); err != nil {
klog.Error(err)
return err
}
}
} else {
// The object is being deleted
​
if sliceutil.HasString(user.ObjectMeta.Finalizers, finalizer) {
// we do not need to delete the user from ldapServer when ldapClient is nil
// 如果用户已经删除,则从ldap删除,并发布事件
if c.ldapClient != nil {
if err = c.waitForDeleteFromLDAP(key); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
//删除角色绑定并发布事件
if err = c.deleteRoleBindings(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
//删除组绑定并发布事件
if err = c.deleteGroupBindings(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
//删除DevOps角色绑定并发布事件
if c.devopsClient != nil {
// unassign jenkins role, unassign multiple times is allowed
if err = c.waitForUnassignDevOpsAdminRole(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
//删除登录记录并发布事件
if err = c.deleteLoginRecords(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}

// remove our finalizer from the list and update it.
user.Finalizers = sliceutil.RemoveString(user.ObjectMeta.Finalizers, func(item string) bool {
return item == finalizer
})
// 更新用户状态
if user, err = c.ksClient.IamV1alpha2().Users().Update(context.Background(), user, metav1.UpdateOptions{}); err != nil {
klog.Error(err)
return err
}
}

// Our finalizer has finished, so the reconciler can do nothing.
return nil
}

// we do not need to sync ldap info when ldapClient is nil
// 用户信息同步给OpenLdap
if c.ldapClient != nil {
// ignore errors if timeout
if err = c.waitForSyncToLDAP(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}

//密码加密
if user, err = c.encryptPassword(user); err != nil {
klog.Error(err)
return err
}
//同步用户状态
if user, err = c.syncUserStatus(user); err != nil {
klog.Error(err)
return err
}

if c.kubeconfig != nil {
// ensure user kubeconfig configmap is created
if err = c.kubeconfig.CreateKubeConfig(user); err != nil {
klog.Error(err)
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
}

if c.devopsClient != nil {
// assign jenkins role after user create, assign multiple times is allowed
// used as logged-in users can do anything
if err = c.waitForAssignDevOpsAdminRole(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}

// synchronization through kubefed-controller when multi cluster is enabled
if c.multiClusterEnabled {
if err = c.multiClusterSync(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
}

c.recorder.Event(user, corev1.EventTypeNormal, successSynced, messageResourceSynced)
return nil
​
}
0条评论
0 / 1000
chuoo
13文章数
0粉丝数
chuoo
13 文章 | 0 粉丝
原创

Kubesphere核心组件和实现原理分析

2023-04-25 12:41:50
471
0

Kubesphere是国内开源的容器云平台,相比Openshift、Rancher等业界优秀产品,Kubesphere功能丰富度上也不相上下。Kubesphere的技术架构也是基于k8s可编程框架来实现的,本文主要介绍KS的核心组件和核心的机制实现原理,主要包括APIServer、ControllerManager、请求处理过程和认证和授权四个部分,基于KS 3.1.1源码版本。通过研究Kubesphere来理解容器云产品的技术架构和实现有一定参考意义。

一、Kubesphere核心组件介绍

1、整体架构

KubeSphere 官方的核心架构如图所示

核心组件主要有三个:

  • ks-console 前端服务组件

  • ks-apiserver 后端服务组件

  • ks-controller-manager 资源状态维护组件

在安装好的KS的服务组件里面也可以看到核心的服务组件

除了上面三个组件,还多了openIdap,用于存储用户账号,minio存储非结构化数据,如Helm charts。

KubeSphere 的后端设计中沿用了 K8s 声明式 API 的风格,所有可操作的资源都尽可能的抽象成为 CustomResource。所以要熟悉KubeSphere的代码实现,

首先需要熟悉K8S的声明式API的核心APIServer/ControllerManager/Client-go的实现。

2、KS源码目录与功能

二、Kubesphere APIServer实现分析

ks-apiserver 的开发使用了 go-restful 框架,可以在请求链路中增加多个 Filter 用于动态的拦截请求和响应,实现认证、鉴权、审计逻辑转发和反向代理功能。可以认为,ks-apiserver实现了类似微服务网关的功能。

1、整体架构分析

 

从架构图上可以看出,KS APIServer对不同类型资源对象的API作了不同的分发和路由处理:

  • kubernetes原生的对象,由ks-apiserver连接api-server,直接获取更改etcd中kubernetes的原始数据(origin data)即可,操作的对象即kubernetes原生的configmap. deployment等对象。

  • KS的CRD对象,ks-controller-manager的封装功能逻辑以crd对象的方式表现在etcd中,ks-apiserver通过连接k8s-apiserver操作etcd中的crd数据(crd data)即可,操作 ks-controller-manager 扩展的逻辑功能。

  • 第三方的operator对象,如prometheus-operator等第三方完成的模块以operator的方式运行在系统中,其功能对应的对象也以crd的形式存放载etcd中,ks-apiserver也是通过和k8s-apiserver交互操作对应的crd完成。

  • 普通的服务对象,如kenkins,sonarqube等以普通服务的方式运行在系统中,ks-apiserver直接通过网络调用和此类对象交互

  • 多集群模式的API对象,通过tower分发到Member集群处理,如阿里云,腾讯云K8S集群。 以上,ks-apiserver通过内部API(inner API aggregate)聚合内部功能 ,并对外提供统一的API,即外部API(out API aggregate)。

 

2、代码实现分析

ks-apiserver整体启动流程如下图,提供Restful API和k8s一样主要通过go-restful这个轻量级框架实现,

 

代码入口:1.cmd/ks-apiserver/apiserver.go

//通过cobra库构建启动CMD并启动
cmd:=app.NewAPIServerCommand()
cmd.Execute()

2.->cmd/ks-apiserver/app/server.go

func NewAPIServerCommand() *cobra.Command
s := options.NewServerRunOptions()
//通过viper库读取kubesphere配置,从指定目录路径或环境变量
conf, err := apiserverconfig.TryLoadFromDisk()
//创建cobra cmd,在cmd中run
func Run(s *options.ServerRunOptions, ctx context.Context) error {
//通过Options创建一个APIServer
apiserver, err := s.NewAPIServer(ctx.Done())
if err != nil {
return err
}
//API注册,处理链
err = apiserver.PrepareRun(ctx.Done())
if err != nil {
return nil
}
//运行APIServer
return apiserver.Run(ctx)
}

3.->cmd/ks-apiserver/app/options/options.go 通过给定的options创建ks-apiserver实例 创建k8s的客户端,并把kubernetesClient(Kubernetes、KubeSphere、Istio、ApiExtensions、Prometheus) 可以通过informers管理的CRD,外部的组件通过http等公共协议远程调用管理部分通过服务接口直接提供的资源 ,如 MetricsClient、LoggingClient、S3Client、devopsClient、SonarClient、eventsclient CacheClient auditingclient、alertingClient 实例化后挂在APIServer上

func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error){
...
informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),
kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
apiServer.InformerFactory = informerFactory
...
}

4.->pkg/apiserver/server.go 定义APIServer,注册API,创建处理链(审计、授权)

func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
​
s.container = restful.NewContainer()
s.container.Filter(logRequestAndResponse)
s.container.Router(restful.CurlyRouter{})
s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(panicReason, httpWriter)
})
//注册ks 的API
s.installKubeSphereAPIs()
//注册计量API
s.installMetricsAPI()
s.container.Filter(monitorRequest)

for _, ws := range s.container.RegisteredWebServices() {
klog.V(2).Infof("%s", ws.RootPath())
}

s.Server.Handler = s.container
//处理链
s.buildHandlerChain(stopCh)

return nil
​
}

5.->pkg/apiserver/server.go 接口注册,添加到go-restful的container并与授权认证关联

func (s *APIServer) installKubeSphereAPIs() {
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
...
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
....
urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
​
}

5.->pkg/apiserver/server.go 接口注册,添加到go-restful的container并与授权认证关联,这里面如果为devops已经独立的外部服务,则通过代理方式实现注册转发。

func (s *APIServer) installKubeSphereAPIs() {
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)

​
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
....
urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
​
}
​
//pkg/kapis/config/v1alpha2/register.go
//直接注册
func AddToContainer(c *restful.Container, config *kubesphereconfig.Config) error {
webservice := runtime.NewWebService(GroupVersion)
​
webservice.Route(webservice.GET("/configs/oauth").
Doc("Information about the authorization server are published.").
To(func(request *restful.Request, response *restful.Response) {
response.WriteEntity(config.AuthenticationOptions.OAuthOptions)
}))

webservice.Route(webservice.GET("/configs/configz").
Doc("Information about the server configuration").
To(func(request *restful.Request, response *restful.Response) {
response.WriteAsJson(config.ToMap())
}))

c.Add(webservice)
return nil
​
}
//下面是代理方式的注册处理过程 ,devops之类的转发到ks-devops服务
func AddToContainer(container *restful.Container, endpoint string) error {
proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version)
if err != nil {
return err
}
​
return proxy.AddToContainer(container)
​
}
​
//pkg/kapis/generic/generic.go
func NewGenericProxy(endpoint string, groupName string, version string) (*genericProxy, error) {
parse, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
​
// trim path suffix slash
parse.Path = strings.Trim(parse.Path, "/")

return &genericProxy{
Endpoint:  parse,
GroupName: groupName,
Version:   version,
}, nil
​
}
//添加到go-restful container,支持GET/PUT/POST/DELETE/PATCH
func (g *genericProxy) AddToContainer(container *restful.Container) error {
webservice := runtime.NewWebService(schema.GroupVersion{
Group:   g.GroupName,
Version: g.Version,
})

​
webservice.Route(webservice.GET("/{path:*}").
To(g.handler).
Returns(http.StatusOK, api.StatusOK, nil))
...
container.Add(webservice)
return nil
​
}
​
//最终实现请求转发到对应的endpoint
func (g *genericProxy) handler(request *restful.Request, response *restful.Response) {
u := g.makeURL(request)
​
httpProxy := proxy.NewUpgradeAwareHandler(u, http.DefaultTransport, false, false, &errorResponder{})
httpProxy.ServeHTTP(response, request.Request)
​
}

6.->pkg/apiserver/server.go //创建过滤处理链,处理审计、授权和认证

func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
requestInfoResolver := &request.RequestInfoFactory{
APIPrefixes:          sets.NewString("api", "apis", "kapis", "kapi"),
GrouplessAPIPrefixes: sets.NewString("api", "kapi"),
GlobalResources: []schema.GroupResource{
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralUser),
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralGlobalRole),
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralGlobalRoleBinding),
tenantv1alpha1.Resource(tenantv1alpha1.ResourcePluralWorkspace),
tenantv1alpha2.Resource(tenantv1alpha1.ResourcePluralWorkspace),
tenantv1alpha2.Resource(clusterv1alpha1.ResourcesPluralCluster),
clusterv1alpha1.Resource(clusterv1alpha1.ResourcesPluralCluster),
resourcev1alpha3.Resource(clusterv1alpha1.ResourcesPluralCluster),
notificationv2beta1.Resource(v2beta1.ResourcesPluralConfig),
notificationv2beta1.Resource(v2beta1.ResourcesPluralReceiver),
},
}
​
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})

//开启审计
if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler,
audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
}

var authorizers authorizer.Authorizer
//授权
switch s.Config.AuthorizationOptions.Mode {
case authorizationoptions.AlwaysAllow:
authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()
case authorizationoptions.AlwaysDeny:
authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()
default:
fallthrough
case authorizationoptions.RBAC:
excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}
pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
amOperator := am.NewReadOnlyOperator(s.InformerFactory)
authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))
}

handler = filters.WithAuthorization(handler, authorizers)
//多集群模式
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}

loginRecorder := auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())
// authenticators are unordered
//和k8s一样支持多种认证模式
authn := unionauth.New(anonymous.NewAuthenticator(),
basictoken.New(basic.NewBasicAuthenticator(auth.NewPasswordAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions), loginRecorder)),
bearertoken.New(jwttoken.NewTokenAuthenticator(auth.NewTokenOperator(s.CacheClient,
s.Config.AuthenticationOptions),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())))
handler = filters.WithAuthentication(handler, authn)
handler = filters.WithRequestInfo(handler, requestInfoResolver)

s.Server.Handler = handler
​
}

7.->pkg/apiserver/server.go

//运行APIServer
func (s *APIServer) Run(ctx context.Context) (err error) {
​
//等待资源同步
err = s.waitForResourceSync(ctx)
if err != nil {
return err
}

shutdownCtx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
<-ctx.Done()
_ = s.Server.Shutdown(shutdownCtx)
}()
//启动http/https服务
klog.V(0).Infof("Start listening on %s", s.Server.Addr)
if s.Server.TLSConfig != nil {
err = s.Server.ListenAndServeTLS("", "")
} else {
err = s.Server.ListenAndServe()
}

return err
​
}
​
//连接k8s的API Server,完成GVR资源同步
func (s *APIServer) waitForResourceSync(ctx context.Context) error {
klog.V(0).Info("Start cache objects")
​
stopCh := ctx.Done()

//服务发现
discoveryClient := s.KubernetesClient.Kubernetes().Discovery()
_, apiResourcesList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
return err
}

isResourceExists := func(resource schema.GroupVersionResource) bool {
for _, apiResource := range apiResourcesList {
if apiResource.GroupVersion == resource.GroupVersion().String() {
for _, rsc := range apiResource.APIResources {
if rsc.Name == resource.Resource {
return true
}
}
}
}
return false
}

// resources we have to create informer first
//原生K8S资源
k8sGVRs := []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "namespaces"},
{Group: "", Version: "v1", Resource: "nodes"},
{Group: "", Version: "v1", Resource: "resourcequotas"},
{Group: "", Version: "v1", Resource: "pods"},
{Group: "", Version: "v1", Resource: "services"},
{Group: "", Version: "v1", Resource: "persistentvolumeclaims"},
{Group: "", Version: "v1", Resource: "secrets"},
{Group: "", Version: "v1", Resource: "configmaps"},
{Group: "", Version: "v1", Resource: "serviceaccounts"},
.....
}
...
//K8S资源对象的formerFactory启动
s.InformerFactory.KubernetesSharedInformerFactory().Start(stopCh)
s.InformerFactory.KubernetesSharedInformerFactory().WaitForCacheSync(stopCh)

ksInformerFactory := s.InformerFactory.KubeSphereSharedInformerFactory()

//ks资源
ksGVRs := []schema.GroupVersionResource{
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"},
{Group: "tenant.kubesphere.io", Version: "v1alpha2", Resource: "workspacetemplates"},
...
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "loginrecords"},
{Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"},
{Group: "devops.kubesphere.io", Version: "v1alpha3", Resource: "devopsprojects"},
}

//
devopsGVRs := []schema.GroupVersionResource{
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibinaries"},
...
}

servicemeshGVRs := []schema.GroupVersionResource{
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "strategies"},
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "servicepolicies"},
}

// federated resources on cached in multi cluster setup
//多集群联邦资源
federatedResourceGVRs := []schema.GroupVersionResource{
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRole),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRoleBindingBinding),
...
}

ksInformerFactory.Start(stopCh)
ksInformerFactory.WaitForCacheSync(stopCh)
....
//prometheus
if promFactory := s.InformerFactory.PrometheusSharedInformerFactory(); promFactory != nil {
prometheusGVRs := []schema.GroupVersionResource{
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheuses"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheusrules"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "thanosrulers"},
}
for _, gvr := range prometheusGVRs {
if isResourceExists(gvr) {
_, err = promFactory.ForResource(gvr)
if err != nil {
return err
}
} else {
klog.Warningf("resource %s not exists in the cluster", gvr)
}
}
promFactory.Start(stopCh)
promFactory.WaitForCacheSync(stopCh)
}

// controller runtime cache for resources
go s.RuntimeCache.Start(ctx)
s.RuntimeCache.WaitForCacheSync(ctx)
...
return nil
​
}

至此,APIServer相关流程处理完成

 

三、KubeSphere Controller-manager实现分析

ks-controller-manager主要是kubesphere的基本系统功能集合,并将其提供的功能抽象为CRD注册在kubernetes中对ks-apiserver提供使用入口。而监控告警等第三方不断扩展的组件也采用同样的方式部署在kubernetes上,并由ks-apiserver最终统一聚合。做到功能模块分离微服务化,交互入口中心统一化。

1、KS Controller-manager启动流程分析

2、代码实现分析

1.代码入口:cmd/controller-manager/controller-manager.go

//和APIServer类似,通过cobra库构建启动CMD并启动
command := app.NewControllerManagerCommand()
command.Execute()

 

2.->cmd/controller-manager/app/options/options.go

//聚合各类组件的Options
func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions {
s := &KubeSphereControllerManagerOptions{
KubernetesOptions:     k8s.NewKubernetesOptions(),
DevopsOptions:         jenkins.NewDevopsOptions(),
S3Options:             s3.NewS3Options(),
LdapOptions:           ldapclient.NewOptions(),
OpenPitrixOptions:     openpitrix.NewOptions(),
NetworkOptions:        network.NewNetworkOptions(),
MultiClusterOptions:   multicluster.NewOptions(),
ServiceMeshOptions:    servicemesh.NewServiceMeshOptions(),
AuthenticationOptions: authoptions.NewAuthenticateOptions(),
LeaderElection: &leaderelection.LeaderElectionConfig{
LeaseDuration: 30 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod:   5 * time.Second,
},
LeaderElect:         false,
WebhookCertDir:      "",
ApplicationSelector: "",
}
​
return s
​
}

3.->cmd/controller-manager/app/server.go

func NewControllerManagerCommand() *cobra.Command {
// options
s := options.NewKubeSphereControllerManagerOptions()
//通过viper库读取配置
conf, err := controllerconfig.TryLoadFromDisk()
if err == nil {
// make sure LeaderElection is not nil
s = &options.KubeSphereControllerManagerOptions{
KubernetesOptions:     conf.KubernetesOptions,
DevopsOptions:         conf.DevopsOptions,
S3Options:             conf.S3Options,
AuthenticationOptions: conf.AuthenticationOptions,
LdapOptions:           conf.LdapOptions,
OpenPitrixOptions:     conf.OpenPitrixOptions,
NetworkOptions:        conf.NetworkOptions,
MultiClusterOptions:   conf.MultiClusterOptions,
ServiceMeshOptions:    conf.ServiceMeshOptions,
LeaderElection:        s.LeaderElection,
LeaderElect:           s.LeaderElect,
WebhookCertDir:        s.WebhookCertDir,
}
} else {
klog.Fatal("Failed to load configuration from disk", err)
}
​
cmd := &cobra.Command{
Use:  "controller-manager",
Long: `KubeSphere controller manager is a daemon that`,
Run: func(cmd *cobra.Command, args []string) {
if errs := s.Validate(); len(errs) != 0 {
klog.Error(utilerrors.NewAggregate(errs))
os.Exit(1)
}

if err = run(s, signals.SetupSignalHandler()); err != nil {
klog.Error(err)
os.Exit(1)
}
},
SilenceUsage: true,
}
​
//这里使用client-go中的controller-runtime来辅助管理controller
//controller-manager作为一个有状态的组件,需要进行选主。APIServer是无状态的,不存在选主过程
func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) error {
​
//创建K8S客户端
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)

//加载各类客户端
//devopsClient/ldapClient/s3Client....
var devopsClient devops.Interface
if s.DevopsOptions != nil && len(s.DevopsOptions.Host) != 0 {
devopsClient, err = jenkins.NewDevopsClient(s.DevopsOptions)
if err != nil {
return fmt.Errorf("failed to connect jenkins, please check jenkins status, error: %v", err)
}
}
....
//manager的选主配置
if s.LeaderElect {
mgrOptions = manager.Options{
CertDir:                 s.WebhookCertDir,
Port:                    8443,
LeaderElection:          s.LeaderElect,
LeaderElectionNamespace: "kubesphere-system",
LeaderElectionID:        "ks-controller-manager-leader-election",
LeaseDuration:           &s.LeaderElection.LeaseDuration,
RetryPeriod:             &s.LeaderElection.RetryPeriod,
RenewDeadline:           &s.LeaderElection.RenewDeadline,
}
}
//创建manager
mgr, err := manager.New(kubernetesClient.Config(), mgrOptions)
informerFactory := informers.NewInformerFactories(
kubernetesClient.Kubernetes(),
kubernetesClient.KubeSphere(),
kubernetesClient.Istio(),
kubernetesClient.Snapshot(),
kubernetesClient.ApiExtensions(),
kubernetesClient.Prometheus())
//关联pkg/controller/workspacetemplate/workspacetemplate_controller.go
workspaceTemplateReconciler := &workspacetemplate.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable}
if err = workspaceTemplateReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create workspace template controller: %v", err)
}
//继续实例化各类controller
...
//注册webhook,通过K8S的Adminssion机制
hookServer := mgr.GetWebhookServer()

klog.V(2).Info("registering webhooks to the webhook server")
hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}})
hookServer.Register("/validate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.ValidatingHandler{C: mgr.GetClient()}})
hookServer.Register("/mutate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.MutatingHandler{C: mgr.GetClient()}})
...
//启动controller,进入controller Loop
if err = mgr.Start(ctx); err != nil {
klog.Fatalf("unable to run the manager: %v", err)
}
​
}

至此,controller-manager组件处理完成。

 

四、KubeSphere API请求处理流程分析

KS的API请求处理流程与一般的Restful Web框架处理差不多,比如SpringMVC,KS这里用的Restful框架和k8s一样是go-restful。区别主要在于informer二级缓存机制,对于资源对象的读通过informer执行,减少对K8S APIServer的请求,从而降低对Etcd的压力。

1、整体处理流程

2、代码分析

1.->pkg/kapis/tenant/v1alpha2/register.go

//在APIServer启动时注册,根据资源对象依赖的组件不同,参数不一样
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface,
ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Client,
auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer,
monitoringclient monitoringclient.Interface, cache cache.Cache, meteringOptions *meteringclient.Options) error {
mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson}
​
ws := runtime.NewWebService(GroupVersion)
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions)

ws.Route(ws.GET("/clusters").
To(handler.ListClusters).
Doc("List clusters available to users").
Returns(http.StatusOK, api.StatusOK, api.ListResult{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.UserResourceTag}))
...
​
}

2.->pkg/kapis/tenant/v1alpha2/handler.go

//关联Handler
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface,
evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client,
am am.AccessManagementInterface, authorizer authorizer.Authorizer,
monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter,
meteringOptions *meteringclient.Options) *tenantHandler {
​
if meteringOptions == nil || meteringOptions.RetentionDay == "" {
meteringOptions = &meteringclient.DefaultMeteringOption
}

return &tenantHandler{
tenant:          tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter),
meteringOptions: meteringOptions,
}
​
}

3.->读数据

//handler.ListClusters

func (h *tenantHandler) ListClusters(r *restful.Request, response *restful.Response) {
    user, ok := request.UserFrom(r.Request.Context())
​
    if !ok {
        response.WriteEntity([]interface{}{})
        return
    }
    
    result, err := h.tenant.ListClusters(user)
    
    if err != nil {
        klog.Error(err)
        if errors.IsNotFound(err) {
            api.HandleNotFound(response, r, err)
            return
        }
        api.HandleInternalError(response, r, err)
        return
    }
    
    response.WriteEntity(result)
​
}
​
//调用 models中的函数h.tenant.ListClusters(user)
//pkg/models/tenant/tenant.go
//集群与命名空间资源因为租户关系 比较复杂
type tenantOperator struct {
    am             am.AccessManagementInterface
    authorizer     authorizer.Authorizer
    k8sclient      kubernetes.Interface
    ksclient       kubesphere.Interface
    resourceGetter *resourcesv1alpha3.ResourceGetter
    events         events.Interface
    lo             logging.LoggingOperator
    auditing       auditing.Interface
    mo             monitoring.MonitoringOperator
    opRelease      openpitrix.ReleaseInterface
}
​
func (t *tenantOperator) ListClusters(user user.Info) (*api.ListResult, error) {
​
    listClustersInGlobalScope := authorizer.AttributesRecord{
        User:            user,
        Verb:            "list",
        Resource:        "clusters",
        ResourceScope:   request.GlobalScope,
        ResourceRequest: true,
    }
    
    //授权检查
    allowedListClustersInGlobalScope, _, err := t.authorizer.Authorize(listClustersInGlobalScope)
    
    if err != nil {
        klog.Error(err)
        return nil, err
    }
    ...
    
    //允许集群和工作空间
    if allowedListClustersInGlobalScope == authorizer.DecisionAllow ||
        allowedListWorkspacesInGlobalScope == authorizer.DecisionAllow {
        //查找资源
        result, err := t.resourceGetter.List(clusterv1alpha1.ResourcesPluralCluster, "", query.New())
        if err != nil {
            klog.Error(err)
            return nil, err
        }
        return result, nil
    }
    //查询角色
    workspaceRoleBindings, err := t.am.ListWorkspaceRoleBindings(user.GetName(), user.GetGroups(), "")
    
    if err != nil {
        klog.Error(err)
        return nil, err
    }
    
    clusters := map[string]*clusterv1alpha1.Cluster{}
    //企业空间的角色绑定,过滤数据
    for _, roleBinding := range workspaceRoleBindings {
        workspaceName := roleBinding.Labels[tenantv1alpha1.WorkspaceLabel]
        workspace, err := t.DescribeWorkspace(workspaceName)
        if err != nil {
            klog.Error(err)
            return nil, err
        }
    
        for _, grantedCluster := range workspace.Spec.Placement.Clusters {
            // skip if cluster exist
            if clusters[grantedCluster.Name] != nil {
                continue
            }
            obj, err := t.resourceGetter.Get(clusterv1alpha1.ResourcesPluralCluster, "", grantedCluster.Name)
            if err != nil {
                klog.Error(err)
                if errors.IsNotFound(err) {
                    continue
                }
                return nil, err
            }
            cluster := obj.(*clusterv1alpha1.Cluster)
            clusters[cluster.Name] = cluster
        }
    }
    
    items := make([]interface{}, 0)
    for _, cluster := range clusters {
        items = append(items, cluster)
    }
    
    return &api.ListResult{Items: items, TotalItems: len(items)}, nil
​
}


最终通过informers/Lister来获取

4.->写数据

func (h *tenantHandler) CreateWorkspace(request *restful.Request, response *restful.Response) {
var workspace tenantv1alpha2.WorkspaceTemplate
​
err := request.ReadEntity(&workspace)

if err != nil {
klog.Error(err)
api.HandleBadRequest(response, request, err)
return
}

created, err := h.tenant.CreateWorkspace(&workspace)

if err != nil {
klog.Error(err)
if errors.IsNotFound(err) {
api.HandleNotFound(response, request, err)
return
}
api.HandleBadRequest(response, request, err)
return
}

response.WriteEntity(created)
​
}
​
pkg/models/tenant/tenant.go
//直接调用client写数据
func (t *tenantOperator) CreateWorkspace(workspace *tenantv1alpha2.WorkspaceTemplate) (*tenantv1alpha2.WorkspaceTemplate, error) {
return t.ksclient.TenantV1alpha2().WorkspaceTemplates().Create(context.Background(), workspace, metav1.CreateOptions{})
}

3、Controller Run

 

五、Kubesphere认证与授权体系

KS的认证与授权体系实现与k8s类似,通过APIServer中进行认证和授权,用户、角色和授权相关信息统一通过CRD与k8s交互,保存在Etcd中。

关键的对象有: Role、User、Group、RoleBinding

角色是根据资源层级进行划分的,cluster role、workspace role、namespace role 不同层级的角色定义了该角色在当前层级可以访问的资源。

KubeSphere 权限控制的核心是 RBAC 基于角色的访问控制

1、整体架构

KS APIServer在过滤处理链中有三个与认证授权相关的组件。

  • Authentication:认证,支持基于用户名密码的BasicAuthen和jwtToken的TokenAuthen

  • Authorization:授权,支持特殊Path的PathAuthor和RBAC的RBACAuthor

  • Adminssion:准入控制器,在认证和授权后进行准入限制,比如webhook。

在Models中定义了三个相关的Operator,对用户、角色和用户组进行数据操作

  • IMOperator:定义了用户信息管理的接口和实现

  • AMOperator:定义了角色与角色绑定的接口与实现,涉及cluster/workspace/namespace/devops等角色

  • GroupOperator:定义了分组的接口与实现

在KS ControllerManager中,Controller主要处理user/role/rolebinding等Crd对象的状态更新。比较特殊的是UserController,

它需要处理Etcd和OpenLdap数据同步。

OpenLdap主要存储用户账号相关信息。

2、代码实现分析

1.->models/iam/im/im.go中定义了用户账号的管理方法接口和实现

type IdentityManagementInterface interface {
    CreateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
    ListUsers(query *query.Query) (*api.ListResult, error)
    DeleteUser(username string) error
    UpdateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
    DescribeUser(username string) (*iamv1alpha2.User, error)
    ModifyPassword(username string, password string) error
    ListLoginRecords(username string, query *query.Query) (*api.ListResult, error)
    PasswordVerify(username string, password string) error
}
​
type imOperator struct {
    ksClient          kubesphere.Interface
    userGetter        resources.Interface
    loginRecordGetter resources.Interface
    options           *authoptions.AuthenticationOptions
}

models/iam/am/am.go中定义了KS中所有的角色和角色绑定的函数

type AccessManagementInterface interface {
    GetGlobalRoleOfUser(username string) (*iamv1alpha2.GlobalRole, error)
    GetWorkspaceRoleOfUser(username string, groups []string, workspace string) ([]*iamv1alpha2.WorkspaceRole, error)
    GetClusterRoleOfUser(username string) (*rbacv1.ClusterRole, error)
    GetNamespaceRoleOfUser(username string, groups []string, namespace string) ([]*rbacv1.Role, error)
    ListRoles(namespace string, query *query.Query) (*api.ListResult, error)
    CreateGlobalRoleBinding(username string, globalRole string) error
    CreateOrUpdateWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
    PatchWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
    CreateOrUpdateGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
    PatchGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
    DeleteWorkspaceRole(workspace string, name string) error
    DeleteGlobalRole(name string) error
    ...
}
type amOperator struct {
    globalRoleBindingGetter    resourcev1alpha3.Interface
    workspaceRoleBindingGetter resourcev1alpha3.Interface
    clusterRoleBindingGetter   resourcev1alpha3.Interface
    roleBindingGetter          resourcev1alpha3.Interface
    globalRoleGetter           resourcev1alpha3.Interface
    workspaceRoleGetter        resourcev1alpha3.Interface
    clusterRoleGetter          resourcev1alpha3.Interface
    roleGetter                 resourcev1alpha3.Interface
    devopsProjectLister        devopslisters.DevOpsProjectLister
    namespaceLister            listersv1.NamespaceLister
    ksclient                   kubesphere.Interface
    k8sclient                  kubernetes.Interface
}
 

2.->pkg/apiserver/server.go

//注册与Authen和author相关的API
func (s *APIServer) installKubeSphereAPIs() {
//创建用户、角色和授权相关的资源操作对象
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)

​
urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
rbacAuthorizer))
//注册oauth相关API
urlruntime.Must(oauth.AddToContainer(s.container, imOperator,
auth.NewTokenOperator(
s.CacheClient,
s.Config.AuthenticationOptions),
auth.NewPasswordAuthenticator(
s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions),
auth.NewOAuthAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory(),
s.Config.AuthenticationOptions),
auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister()),
s.Config.AuthenticationOptions))
//注册租户(workspace)相关API
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
//
urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
rbacAuthorizer))
​
}

3.->pkg/apiserver/server.go //处理链中添加认证授权和准入相关的处理器

func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
​
var authorizers authorizer.Authorizer
//根据配置实例化授权器
switch s.Config.AuthorizationOptions.Mode {
case authorizationoptions.AlwaysAllow:
authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()
case authorizationoptions.AlwaysDeny:
authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()
default:
fallthrough
case authorizationoptions.RBAC://一般采用该模式
//排除以下API不进行授权验证
excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}
pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
amOperator := am.NewReadOnlyOperator(s.InformerFactory)
//联合,放一个数组里面
authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))
}
//授权验证
handler = filters.WithAuthorization(handler, authorizers)
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}
//创建登录记录器,记录登录信息
loginRecorder := auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())
// authenticators are unordered
//把支持的登录验证器加到数组
//BasicAuthenticator,用于用户名密码验证,
//TokenAuthenticator, jwtToken验证
authn := unionauth.New(anonymous.NewAuthenticator(),
basictoken.New(basic.NewBasicAuthenticator(auth.NewPasswordAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions), loginRecorder)),
bearertoken.New(jwttoken.NewTokenAuthenticator(auth.NewTokenOperator(s.CacheClient,
s.Config.AuthenticationOptions),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())))
//加入到过滤链中
handler = filters.WithAuthentication(handler, authn)
​
​
}

 

 

4.->pkg/controller/user/user_controller.go reconcile中实现了Etcd中用户信息变化后,controller处理相关的所有逻辑,比如用户删除、用户修改密码、同步OpenLdap等

func (c *userController) reconcile(key string) error {
// Get the user with this name
user, err := c.userLister.Get(key)
if err != nil {
// The user may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("user '%s' in work queue no longer exists", key))
return nil
}
klog.Error(err)
return err
}
//用户没有删除,加入Finalizers,更新用户状态
if user.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
if !sliceutil.HasString(user.Finalizers, finalizer) {
user.ObjectMeta.Finalizers = append(user.ObjectMeta.Finalizers, finalizer)
if user, err = c.ksClient.IamV1alpha2().Users().Update(context.Background(), user, metav1.UpdateOptions{}); err != nil {
klog.Error(err)
return err
}
}
} else {
// The object is being deleted
​
if sliceutil.HasString(user.ObjectMeta.Finalizers, finalizer) {
// we do not need to delete the user from ldapServer when ldapClient is nil
// 如果用户已经删除,则从ldap删除,并发布事件
if c.ldapClient != nil {
if err = c.waitForDeleteFromLDAP(key); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
//删除角色绑定并发布事件
if err = c.deleteRoleBindings(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
//删除组绑定并发布事件
if err = c.deleteGroupBindings(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
//删除DevOps角色绑定并发布事件
if c.devopsClient != nil {
// unassign jenkins role, unassign multiple times is allowed
if err = c.waitForUnassignDevOpsAdminRole(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
//删除登录记录并发布事件
if err = c.deleteLoginRecords(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}

// remove our finalizer from the list and update it.
user.Finalizers = sliceutil.RemoveString(user.ObjectMeta.Finalizers, func(item string) bool {
return item == finalizer
})
// 更新用户状态
if user, err = c.ksClient.IamV1alpha2().Users().Update(context.Background(), user, metav1.UpdateOptions{}); err != nil {
klog.Error(err)
return err
}
}

// Our finalizer has finished, so the reconciler can do nothing.
return nil
}

// we do not need to sync ldap info when ldapClient is nil
// 用户信息同步给OpenLdap
if c.ldapClient != nil {
// ignore errors if timeout
if err = c.waitForSyncToLDAP(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}

//密码加密
if user, err = c.encryptPassword(user); err != nil {
klog.Error(err)
return err
}
//同步用户状态
if user, err = c.syncUserStatus(user); err != nil {
klog.Error(err)
return err
}

if c.kubeconfig != nil {
// ensure user kubeconfig configmap is created
if err = c.kubeconfig.CreateKubeConfig(user); err != nil {
klog.Error(err)
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
}

if c.devopsClient != nil {
// assign jenkins role after user create, assign multiple times is allowed
// used as logged-in users can do anything
if err = c.waitForAssignDevOpsAdminRole(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}

// synchronization through kubefed-controller when multi cluster is enabled
if c.multiClusterEnabled {
if err = c.multiClusterSync(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
}

c.recorder.Event(user, corev1.EventTypeNormal, successSynced, messageResourceSynced)
return nil
​
}
文章来自个人专栏
容器
13 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0