Kubesphere是国内开源的容器云平台,相比Openshift、Rancher等业界优秀产品,Kubesphere功能丰富度上也不相上下。Kubesphere的技术架构也是基于k8s可编程框架来实现的,本文主要介绍KS的核心组件和核心的机制实现原理,主要包括APIServer、ControllerManager、请求处理过程和认证和授权四个部分,基于KS 3.1.1源码版本。通过研究Kubesphere来理解容器云产品的技术架构和实现有一定参考意义。
一、Kubesphere核心组件介绍
1、整体架构
KubeSphere 官方的核心架构如图所示
核心组件主要有三个:
-
ks-console 前端服务组件
-
ks-apiserver 后端服务组件
-
ks-controller-manager 资源状态维护组件
在安装好的KS的服务组件里面也可以看到核心的服务组件
除了上面三个组件,还多了openIdap,用于存储用户账号,minio存储非结构化数据,如Helm charts。
KubeSphere 的后端设计中沿用了 K8s 声明式 API 的风格,所有可操作的资源都尽可能的抽象成为 CustomResource。所以要熟悉KubeSphere的代码实现,
首先需要熟悉K8S的声明式API的核心APIServer/ControllerManager/Client-go的实现。
2、KS源码目录与功能
二、Kubesphere APIServer实现分析
ks-apiserver 的开发使用了 go-restful 框架,可以在请求链路中增加多个 Filter 用于动态的拦截请求和响应,实现认证、鉴权、审计逻辑转发和反向代理功能。可以认为,ks-apiserver实现了类似微服务网关的功能。
1、整体架构分析
从架构图上可以看出,KS APIServer对不同类型资源对象的API作了不同的分发和路由处理:
-
kubernetes原生的对象,由ks-apiserver连接api-server,直接获取更改etcd中kubernetes的原始数据(origin data)即可,操作的对象即kubernetes原生的configmap. deployment等对象。
-
KS的CRD对象,ks-controller-manager的封装功能逻辑以crd对象的方式表现在etcd中,ks-apiserver通过连接k8s-apiserver操作etcd中的crd数据(crd data)即可,操作 ks-controller-manager 扩展的逻辑功能。
-
第三方的operator对象,如prometheus-operator等第三方完成的模块以operator的方式运行在系统中,其功能对应的对象也以crd的形式存放载etcd中,ks-apiserver也是通过和k8s-apiserver交互操作对应的crd完成。
-
普通的服务对象,如kenkins,sonarqube等以普通服务的方式运行在系统中,ks-apiserver直接通过网络调用和此类对象交互
-
多集群模式的API对象,通过tower分发到Member集群处理,如阿里云,腾讯云K8S集群。 以上,ks-apiserver通过内部API(inner API aggregate)聚合内部功能 ,并对外提供统一的API,即外部API(out API aggregate)。
2、代码实现分析
ks-apiserver整体启动流程如下图,提供Restful API和k8s一样主要通过go-restful这个轻量级框架实现,
代码入口:1.cmd/ks-apiserver/apiserver.go
//通过cobra库构建启动CMD并启动
cmd:=app.NewAPIServerCommand()
cmd.Execute()
2.->cmd/ks-apiserver/app/server.go
func NewAPIServerCommand() *cobra.Command
s := options.NewServerRunOptions()
//通过viper库读取kubesphere配置,从指定目录路径或环境变量
conf, err := apiserverconfig.TryLoadFromDisk()
//创建cobra cmd,在cmd中run
func Run(s *options.ServerRunOptions, ctx context.Context) error {
//通过Options创建一个APIServer
apiserver, err := s.NewAPIServer(ctx.Done())
if err != nil {
return err
}
//API注册,处理链
err = apiserver.PrepareRun(ctx.Done())
if err != nil {
return nil
}
//运行APIServer
return apiserver.Run(ctx)
}
3.->cmd/ks-apiserver/app/options/options.go 通过给定的options创建ks-apiserver实例 创建k8s的客户端,并把kubernetesClient(Kubernetes、KubeSphere、Istio、ApiExtensions、Prometheus) 可以通过informers管理的CRD,外部的组件通过http等公共协议远程调用管理部分通过服务接口直接提供的资源 ,如 MetricsClient、LoggingClient、S3Client、devopsClient、SonarClient、eventsclient CacheClient auditingclient、alertingClient 实例化后挂在APIServer上
func (s *ServerRunOptions) NewAPIServer(stopCh <-chan struct{}) (*apiserver.APIServer, error){
...
informerFactory := informers.NewInformerFactories(kubernetesClient.Kubernetes(), kubernetesClient.KubeSphere(),
kubernetesClient.Istio(), kubernetesClient.Snapshot(), kubernetesClient.ApiExtensions(), kubernetesClient.Prometheus())
apiServer.InformerFactory = informerFactory
...
}
4.->pkg/apiserver/server.go 定义APIServer,注册API,创建处理链(审计、授权)
func (s *APIServer) PrepareRun(stopCh <-chan struct{}) error {
s.container = restful.NewContainer()
s.container.Filter(logRequestAndResponse)
s.container.Router(restful.CurlyRouter{})
s.container.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
logStackOnRecover(panicReason, httpWriter)
})
//注册ks 的API
s.installKubeSphereAPIs()
//注册计量API
s.installMetricsAPI()
s.container.Filter(monitorRequest)
for _, ws := range s.container.RegisteredWebServices() {
klog.V(2).Infof("%s", ws.RootPath())
}
s.Server.Handler = s.container
//处理链
s.buildHandlerChain(stopCh)
return nil
}
5.->pkg/apiserver/server.go 接口注册,添加到go-restful的container并与授权认证关联
func (s *APIServer) installKubeSphereAPIs() {
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
...
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
....
urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
}
5.->pkg/apiserver/server.go 接口注册,添加到go-restful的container并与授权认证关联,这里面如果为devops已经独立的外部服务,则通过代理方式实现注册转发。
func (s *APIServer) installKubeSphereAPIs() {
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
urlruntime.Must(configv1alpha2.AddToContainer(s.container, s.Config))
urlruntime.Must(resourcev1alpha3.AddToContainer(s.container, s.InformerFactory, s.RuntimeCache))
....
urlruntime.Must(devopsv1alpha2.AddToContainer(s.container, s.Config.DevopsOptions.Endpoint))
}
//pkg/kapis/config/v1alpha2/register.go
//直接注册
func AddToContainer(c *restful.Container, config *kubesphereconfig.Config) error {
webservice := runtime.NewWebService(GroupVersion)
webservice.Route(webservice.GET("/configs/oauth").
Doc("Information about the authorization server are published.").
To(func(request *restful.Request, response *restful.Response) {
response.WriteEntity(config.AuthenticationOptions.OAuthOptions)
}))
webservice.Route(webservice.GET("/configs/configz").
Doc("Information about the server configuration").
To(func(request *restful.Request, response *restful.Response) {
response.WriteAsJson(config.ToMap())
}))
c.Add(webservice)
return nil
}
//下面是代理方式的注册处理过程 ,devops之类的转发到ks-devops服务
func AddToContainer(container *restful.Container, endpoint string) error {
proxy, err := generic.NewGenericProxy(endpoint, GroupVersion.Group, GroupVersion.Version)
if err != nil {
return err
}
return proxy.AddToContainer(container)
}
//pkg/kapis/generic/generic.go
func NewGenericProxy(endpoint string, groupName string, version string) (*genericProxy, error) {
parse, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
// trim path suffix slash
parse.Path = strings.Trim(parse.Path, "/")
return &genericProxy{
Endpoint: parse,
GroupName: groupName,
Version: version,
}, nil
}
//添加到go-restful container,支持GET/PUT/POST/DELETE/PATCH
func (g *genericProxy) AddToContainer(container *restful.Container) error {
webservice := runtime.NewWebService(schema.GroupVersion{
Group: g.GroupName,
Version: g.Version,
})
webservice.Route(webservice.GET("/{path:*}").
To(g.handler).
Returns(http.StatusOK, api.StatusOK, nil))
...
container.Add(webservice)
return nil
}
//最终实现请求转发到对应的endpoint
func (g *genericProxy) handler(request *restful.Request, response *restful.Response) {
u := g.makeURL(request)
httpProxy := proxy.NewUpgradeAwareHandler(u, http.DefaultTransport, false, false, &errorResponder{})
httpProxy.ServeHTTP(response, request.Request)
}
6.->pkg/apiserver/server.go //创建过滤处理链,处理审计、授权和认证
func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
requestInfoResolver := &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis", "kapis", "kapi"),
GrouplessAPIPrefixes: sets.NewString("api", "kapi"),
GlobalResources: []schema.GroupResource{
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralUser),
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralGlobalRole),
iamv1alpha2.Resource(iamv1alpha2.ResourcesPluralGlobalRoleBinding),
tenantv1alpha1.Resource(tenantv1alpha1.ResourcePluralWorkspace),
tenantv1alpha2.Resource(tenantv1alpha1.ResourcePluralWorkspace),
tenantv1alpha2.Resource(clusterv1alpha1.ResourcesPluralCluster),
clusterv1alpha1.Resource(clusterv1alpha1.ResourcesPluralCluster),
resourcev1alpha3.Resource(clusterv1alpha1.ResourcesPluralCluster),
notificationv2beta1.Resource(v2beta1.ResourcesPluralConfig),
notificationv2beta1.Resource(v2beta1.ResourcesPluralReceiver),
},
}
handler := s.Server.Handler
handler = filters.WithKubeAPIServer(handler, s.KubernetesClient.Config(), &errorResponder{})
//开启审计
if s.Config.AuditingOptions.Enable {
handler = filters.WithAuditing(handler,
audit.NewAuditing(s.InformerFactory, s.Config.AuditingOptions, stopCh))
}
var authorizers authorizer.Authorizer
//授权
switch s.Config.AuthorizationOptions.Mode {
case authorizationoptions.AlwaysAllow:
authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()
case authorizationoptions.AlwaysDeny:
authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()
default:
fallthrough
case authorizationoptions.RBAC:
excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}
pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
amOperator := am.NewReadOnlyOperator(s.InformerFactory)
authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))
}
handler = filters.WithAuthorization(handler, authorizers)
//多集群模式
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}
loginRecorder := auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())
// authenticators are unordered
//和k8s一样支持多种认证模式
authn := unionauth.New(anonymous.NewAuthenticator(),
basictoken.New(basic.NewBasicAuthenticator(auth.NewPasswordAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions), loginRecorder)),
bearertoken.New(jwttoken.NewTokenAuthenticator(auth.NewTokenOperator(s.CacheClient,
s.Config.AuthenticationOptions),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())))
handler = filters.WithAuthentication(handler, authn)
handler = filters.WithRequestInfo(handler, requestInfoResolver)
s.Server.Handler = handler
}
7.->pkg/apiserver/server.go
//运行APIServer
func (s *APIServer) Run(ctx context.Context) (err error) {
//等待资源同步
err = s.waitForResourceSync(ctx)
if err != nil {
return err
}
shutdownCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-ctx.Done()
_ = s.Server.Shutdown(shutdownCtx)
}()
//启动http/https服务
klog.V(0).Infof("Start listening on %s", s.Server.Addr)
if s.Server.TLSConfig != nil {
err = s.Server.ListenAndServeTLS("", "")
} else {
err = s.Server.ListenAndServe()
}
return err
}
//连接k8s的API Server,完成GVR资源同步
func (s *APIServer) waitForResourceSync(ctx context.Context) error {
klog.V(0).Info("Start cache objects")
stopCh := ctx.Done()
//服务发现
discoveryClient := s.KubernetesClient.Kubernetes().Discovery()
_, apiResourcesList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
return err
}
isResourceExists := func(resource schema.GroupVersionResource) bool {
for _, apiResource := range apiResourcesList {
if apiResource.GroupVersion == resource.GroupVersion().String() {
for _, rsc := range apiResource.APIResources {
if rsc.Name == resource.Resource {
return true
}
}
}
}
return false
}
// resources we have to create informer first
//原生K8S资源
k8sGVRs := []schema.GroupVersionResource{
{Group: "", Version: "v1", Resource: "namespaces"},
{Group: "", Version: "v1", Resource: "nodes"},
{Group: "", Version: "v1", Resource: "resourcequotas"},
{Group: "", Version: "v1", Resource: "pods"},
{Group: "", Version: "v1", Resource: "services"},
{Group: "", Version: "v1", Resource: "persistentvolumeclaims"},
{Group: "", Version: "v1", Resource: "secrets"},
{Group: "", Version: "v1", Resource: "configmaps"},
{Group: "", Version: "v1", Resource: "serviceaccounts"},
.....
}
...
//K8S资源对象的formerFactory启动
s.InformerFactory.KubernetesSharedInformerFactory().Start(stopCh)
s.InformerFactory.KubernetesSharedInformerFactory().WaitForCacheSync(stopCh)
ksInformerFactory := s.InformerFactory.KubeSphereSharedInformerFactory()
//ks资源
ksGVRs := []schema.GroupVersionResource{
{Group: "tenant.kubesphere.io", Version: "v1alpha1", Resource: "workspaces"},
{Group: "tenant.kubesphere.io", Version: "v1alpha2", Resource: "workspacetemplates"},
...
{Group: "iam.kubesphere.io", Version: "v1alpha2", Resource: "loginrecords"},
{Group: "cluster.kubesphere.io", Version: "v1alpha1", Resource: "clusters"},
{Group: "devops.kubesphere.io", Version: "v1alpha3", Resource: "devopsprojects"},
}
//
devopsGVRs := []schema.GroupVersionResource{
{Group: "devops.kubesphere.io", Version: "v1alpha1", Resource: "s2ibinaries"},
...
}
servicemeshGVRs := []schema.GroupVersionResource{
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "strategies"},
{Group: "servicemesh.kubesphere.io", Version: "v1alpha2", Resource: "servicepolicies"},
}
// federated resources on cached in multi cluster setup
//多集群联邦资源
federatedResourceGVRs := []schema.GroupVersionResource{
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRole),
typesv1beta1.SchemeGroupVersion.WithResource(typesv1beta1.ResourcePluralFederatedClusterRoleBindingBinding),
...
}
ksInformerFactory.Start(stopCh)
ksInformerFactory.WaitForCacheSync(stopCh)
....
//prometheus
if promFactory := s.InformerFactory.PrometheusSharedInformerFactory(); promFactory != nil {
prometheusGVRs := []schema.GroupVersionResource{
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheuses"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "prometheusrules"},
{Group: "monitoring.coreos.com", Version: "v1", Resource: "thanosrulers"},
}
for _, gvr := range prometheusGVRs {
if isResourceExists(gvr) {
_, err = promFactory.ForResource(gvr)
if err != nil {
return err
}
} else {
klog.Warningf("resource %s not exists in the cluster", gvr)
}
}
promFactory.Start(stopCh)
promFactory.WaitForCacheSync(stopCh)
}
// controller runtime cache for resources
go s.RuntimeCache.Start(ctx)
s.RuntimeCache.WaitForCacheSync(ctx)
...
return nil
}
至此,APIServer相关流程处理完成
三、KubeSphere Controller-manager实现分析
ks-controller-manager主要是kubesphere的基本系统功能集合,并将其提供的功能抽象为CRD注册在kubernetes中对ks-apiserver提供使用入口。而监控告警等第三方不断扩展的组件也采用同样的方式部署在kubernetes上,并由ks-apiserver最终统一聚合。做到功能模块分离微服务化,交互入口中心统一化。
1、KS Controller-manager启动流程分析
2、代码实现分析
1.代码入口:cmd/controller-manager/controller-manager.go
//和APIServer类似,通过cobra库构建启动CMD并启动
command := app.NewControllerManagerCommand()
command.Execute()
2.->cmd/controller-manager/app/options/options.go
//聚合各类组件的Options
func NewKubeSphereControllerManagerOptions() *KubeSphereControllerManagerOptions {
s := &KubeSphereControllerManagerOptions{
KubernetesOptions: k8s.NewKubernetesOptions(),
DevopsOptions: jenkins.NewDevopsOptions(),
S3Options: s3.NewS3Options(),
LdapOptions: ldapclient.NewOptions(),
OpenPitrixOptions: openpitrix.NewOptions(),
NetworkOptions: network.NewNetworkOptions(),
MultiClusterOptions: multicluster.NewOptions(),
ServiceMeshOptions: servicemesh.NewServiceMeshOptions(),
AuthenticationOptions: authoptions.NewAuthenticateOptions(),
LeaderElection: &leaderelection.LeaderElectionConfig{
LeaseDuration: 30 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
},
LeaderElect: false,
WebhookCertDir: "",
ApplicationSelector: "",
}
return s
}
3.->cmd/controller-manager/app/server.go
func NewControllerManagerCommand() *cobra.Command {
// options
s := options.NewKubeSphereControllerManagerOptions()
//通过viper库读取配置
conf, err := controllerconfig.TryLoadFromDisk()
if err == nil {
// make sure LeaderElection is not nil
s = &options.KubeSphereControllerManagerOptions{
KubernetesOptions: conf.KubernetesOptions,
DevopsOptions: conf.DevopsOptions,
S3Options: conf.S3Options,
AuthenticationOptions: conf.AuthenticationOptions,
LdapOptions: conf.LdapOptions,
OpenPitrixOptions: conf.OpenPitrixOptions,
NetworkOptions: conf.NetworkOptions,
MultiClusterOptions: conf.MultiClusterOptions,
ServiceMeshOptions: conf.ServiceMeshOptions,
LeaderElection: s.LeaderElection,
LeaderElect: s.LeaderElect,
WebhookCertDir: s.WebhookCertDir,
}
} else {
klog.Fatal("Failed to load configuration from disk", err)
}
cmd := &cobra.Command{
Use: "controller-manager",
Long: `KubeSphere controller manager is a daemon that`,
Run: func(cmd *cobra.Command, args []string) {
if errs := s.Validate(); len(errs) != 0 {
klog.Error(utilerrors.NewAggregate(errs))
os.Exit(1)
}
if err = run(s, signals.SetupSignalHandler()); err != nil {
klog.Error(err)
os.Exit(1)
}
},
SilenceUsage: true,
}
//这里使用client-go中的controller-runtime来辅助管理controller
//controller-manager作为一个有状态的组件,需要进行选主。APIServer是无状态的,不存在选主过程
func run(s *options.KubeSphereControllerManagerOptions, ctx context.Context) error {
//创建K8S客户端
kubernetesClient, err := k8s.NewKubernetesClient(s.KubernetesOptions)
//加载各类客户端
//devopsClient/ldapClient/s3Client....
var devopsClient devops.Interface
if s.DevopsOptions != nil && len(s.DevopsOptions.Host) != 0 {
devopsClient, err = jenkins.NewDevopsClient(s.DevopsOptions)
if err != nil {
return fmt.Errorf("failed to connect jenkins, please check jenkins status, error: %v", err)
}
}
....
//manager的选主配置
if s.LeaderElect {
mgrOptions = manager.Options{
CertDir: s.WebhookCertDir,
Port: 8443,
LeaderElection: s.LeaderElect,
LeaderElectionNamespace: "kubesphere-system",
LeaderElectionID: "ks-controller-manager-leader-election",
LeaseDuration: &s.LeaderElection.LeaseDuration,
RetryPeriod: &s.LeaderElection.RetryPeriod,
RenewDeadline: &s.LeaderElection.RenewDeadline,
}
}
//创建manager
mgr, err := manager.New(kubernetesClient.Config(), mgrOptions)
informerFactory := informers.NewInformerFactories(
kubernetesClient.Kubernetes(),
kubernetesClient.KubeSphere(),
kubernetesClient.Istio(),
kubernetesClient.Snapshot(),
kubernetesClient.ApiExtensions(),
kubernetesClient.Prometheus())
//关联pkg/controller/workspacetemplate/workspacetemplate_controller.go
workspaceTemplateReconciler := &workspacetemplate.Reconciler{MultiClusterEnabled: s.MultiClusterOptions.Enable}
if err = workspaceTemplateReconciler.SetupWithManager(mgr); err != nil {
klog.Fatalf("Unable to create workspace template controller: %v", err)
}
//继续实例化各类controller
...
//注册webhook,通过K8S的Adminssion机制
hookServer := mgr.GetWebhookServer()
klog.V(2).Info("registering webhooks to the webhook server")
hookServer.Register("/validate-email-iam-kubesphere-io-v1alpha2", &webhook.Admission{Handler: &user.EmailValidator{Client: mgr.GetClient()}})
hookServer.Register("/validate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.ValidatingHandler{C: mgr.GetClient()}})
hookServer.Register("/mutate-network-kubesphere-io-v1alpha1", &webhook.Admission{Handler: &webhooks.MutatingHandler{C: mgr.GetClient()}})
...
//启动controller,进入controller Loop
if err = mgr.Start(ctx); err != nil {
klog.Fatalf("unable to run the manager: %v", err)
}
}
至此,controller-manager组件处理完成。
四、KubeSphere API请求处理流程分析
KS的API请求处理流程与一般的Restful Web框架处理差不多,比如SpringMVC,KS这里用的Restful框架和k8s一样是go-restful。区别主要在于informer二级缓存机制,对于资源对象的读通过informer执行,减少对K8S APIServer的请求,从而降低对Etcd的压力。
1、整体处理流程
2、代码分析
1.->pkg/kapis/tenant/v1alpha2/register.go
//在APIServer启动时注册,根据资源对象依赖的组件不同,参数不一样
func AddToContainer(c *restful.Container, factory informers.InformerFactory, k8sclient kubernetes.Interface,
ksclient kubesphere.Interface, evtsClient events.Client, loggingClient logging.Client,
auditingclient auditing.Client, am am.AccessManagementInterface, authorizer authorizer.Authorizer,
monitoringclient monitoringclient.Interface, cache cache.Cache, meteringOptions *meteringclient.Options) error {
mimePatch := []string{restful.MIME_JSON, runtime.MimeMergePatchJson, runtime.MimeJsonPatchJson}
ws := runtime.NewWebService(GroupVersion)
handler := newTenantHandler(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourcev1alpha3.NewResourceGetter(factory, cache), meteringOptions)
ws.Route(ws.GET("/clusters").
To(handler.ListClusters).
Doc("List clusters available to users").
Returns(http.StatusOK, api.StatusOK, api.ListResult{}).
Metadata(restfulspec.KeyOpenAPITags, []string{constants.UserResourceTag}))
...
}
2.->pkg/kapis/tenant/v1alpha2/handler.go
//关联Handler
func newTenantHandler(factory informers.InformerFactory, k8sclient kubernetes.Interface, ksclient kubesphere.Interface,
evtsClient events.Client, loggingClient logging.Client, auditingclient auditing.Client,
am am.AccessManagementInterface, authorizer authorizer.Authorizer,
monitoringclient monitoringclient.Interface, resourceGetter *resourcev1alpha3.ResourceGetter,
meteringOptions *meteringclient.Options) *tenantHandler {
if meteringOptions == nil || meteringOptions.RetentionDay == "" {
meteringOptions = &meteringclient.DefaultMeteringOption
}
return &tenantHandler{
tenant: tenant.New(factory, k8sclient, ksclient, evtsClient, loggingClient, auditingclient, am, authorizer, monitoringclient, resourceGetter),
meteringOptions: meteringOptions,
}
}
3.->读数据
//handler.ListClusters
func (h *tenantHandler) ListClusters(r *restful.Request, response *restful.Response) {
user, ok := request.UserFrom(r.Request.Context())
if !ok {
response.WriteEntity([]interface{}{})
return
}
result, err := h.tenant.ListClusters(user)
if err != nil {
klog.Error(err)
if errors.IsNotFound(err) {
api.HandleNotFound(response, r, err)
return
}
api.HandleInternalError(response, r, err)
return
}
response.WriteEntity(result)
}
//调用 models中的函数h.tenant.ListClusters(user)
//pkg/models/tenant/tenant.go
//集群与命名空间资源因为租户关系 比较复杂
type tenantOperator struct {
am am.AccessManagementInterface
authorizer authorizer.Authorizer
k8sclient kubernetes.Interface
ksclient kubesphere.Interface
resourceGetter *resourcesv1alpha3.ResourceGetter
events events.Interface
lo logging.LoggingOperator
auditing auditing.Interface
mo monitoring.MonitoringOperator
opRelease openpitrix.ReleaseInterface
}
func (t *tenantOperator) ListClusters(user user.Info) (*api.ListResult, error) {
listClustersInGlobalScope := authorizer.AttributesRecord{
User: user,
Verb: "list",
Resource: "clusters",
ResourceScope: request.GlobalScope,
ResourceRequest: true,
}
//授权检查
allowedListClustersInGlobalScope, _, err := t.authorizer.Authorize(listClustersInGlobalScope)
if err != nil {
klog.Error(err)
return nil, err
}
...
//允许集群和工作空间
if allowedListClustersInGlobalScope == authorizer.DecisionAllow ||
allowedListWorkspacesInGlobalScope == authorizer.DecisionAllow {
//查找资源
result, err := t.resourceGetter.List(clusterv1alpha1.ResourcesPluralCluster, "", query.New())
if err != nil {
klog.Error(err)
return nil, err
}
return result, nil
}
//查询角色
workspaceRoleBindings, err := t.am.ListWorkspaceRoleBindings(user.GetName(), user.GetGroups(), "")
if err != nil {
klog.Error(err)
return nil, err
}
clusters := map[string]*clusterv1alpha1.Cluster{}
//企业空间的角色绑定,过滤数据
for _, roleBinding := range workspaceRoleBindings {
workspaceName := roleBinding.Labels[tenantv1alpha1.WorkspaceLabel]
workspace, err := t.DescribeWorkspace(workspaceName)
if err != nil {
klog.Error(err)
return nil, err
}
for _, grantedCluster := range workspace.Spec.Placement.Clusters {
// skip if cluster exist
if clusters[grantedCluster.Name] != nil {
continue
}
obj, err := t.resourceGetter.Get(clusterv1alpha1.ResourcesPluralCluster, "", grantedCluster.Name)
if err != nil {
klog.Error(err)
if errors.IsNotFound(err) {
continue
}
return nil, err
}
cluster := obj.(*clusterv1alpha1.Cluster)
clusters[cluster.Name] = cluster
}
}
items := make([]interface{}, 0)
for _, cluster := range clusters {
items = append(items, cluster)
}
return &api.ListResult{Items: items, TotalItems: len(items)}, nil
}
最终通过informers/Lister来获取
4.->写数据
func (h *tenantHandler) CreateWorkspace(request *restful.Request, response *restful.Response) {
var workspace tenantv1alpha2.WorkspaceTemplate
err := request.ReadEntity(&workspace)
if err != nil {
klog.Error(err)
api.HandleBadRequest(response, request, err)
return
}
created, err := h.tenant.CreateWorkspace(&workspace)
if err != nil {
klog.Error(err)
if errors.IsNotFound(err) {
api.HandleNotFound(response, request, err)
return
}
api.HandleBadRequest(response, request, err)
return
}
response.WriteEntity(created)
}
pkg/models/tenant/tenant.go
//直接调用client写数据
func (t *tenantOperator) CreateWorkspace(workspace *tenantv1alpha2.WorkspaceTemplate) (*tenantv1alpha2.WorkspaceTemplate, error) {
return t.ksclient.TenantV1alpha2().WorkspaceTemplates().Create(context.Background(), workspace, metav1.CreateOptions{})
}
3、Controller Run
五、Kubesphere认证与授权体系
KS的认证与授权体系实现与k8s类似,通过APIServer中进行认证和授权,用户、角色和授权相关信息统一通过CRD与k8s交互,保存在Etcd中。
关键的对象有: Role、User、Group、RoleBinding
角色是根据资源层级进行划分的,cluster role、workspace role、namespace role 不同层级的角色定义了该角色在当前层级可以访问的资源。
KubeSphere 权限控制的核心是 RBAC 基于角色的访问控制
1、整体架构
KS APIServer在过滤处理链中有三个与认证授权相关的组件。
-
Authentication:认证,支持基于用户名密码的BasicAuthen和jwtToken的TokenAuthen
-
Authorization:授权,支持特殊Path的PathAuthor和RBAC的RBACAuthor
-
Adminssion:准入控制器,在认证和授权后进行准入限制,比如webhook。
在Models中定义了三个相关的Operator,对用户、角色和用户组进行数据操作
-
IMOperator:定义了用户信息管理的接口和实现
-
AMOperator:定义了角色与角色绑定的接口与实现,涉及cluster/workspace/namespace/devops等角色
-
GroupOperator:定义了分组的接口与实现
在KS ControllerManager中,Controller主要处理user/role/rolebinding等Crd对象的状态更新。比较特殊的是UserController,
它需要处理Etcd和OpenLdap数据同步。
OpenLdap主要存储用户账号相关信息。
2、代码实现分析
1.->models/iam/im/im.go中定义了用户账号的管理方法接口和实现
type IdentityManagementInterface interface {
CreateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
ListUsers(query *query.Query) (*api.ListResult, error)
DeleteUser(username string) error
UpdateUser(user *iamv1alpha2.User) (*iamv1alpha2.User, error)
DescribeUser(username string) (*iamv1alpha2.User, error)
ModifyPassword(username string, password string) error
ListLoginRecords(username string, query *query.Query) (*api.ListResult, error)
PasswordVerify(username string, password string) error
}
type imOperator struct {
ksClient kubesphere.Interface
userGetter resources.Interface
loginRecordGetter resources.Interface
options *authoptions.AuthenticationOptions
}
models/iam/am/am.go中定义了KS中所有的角色和角色绑定的函数
type AccessManagementInterface interface {
GetGlobalRoleOfUser(username string) (*iamv1alpha2.GlobalRole, error)
GetWorkspaceRoleOfUser(username string, groups []string, workspace string) ([]*iamv1alpha2.WorkspaceRole, error)
GetClusterRoleOfUser(username string) (*rbacv1.ClusterRole, error)
GetNamespaceRoleOfUser(username string, groups []string, namespace string) ([]*rbacv1.Role, error)
ListRoles(namespace string, query *query.Query) (*api.ListResult, error)
CreateGlobalRoleBinding(username string, globalRole string) error
CreateOrUpdateWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
PatchWorkspaceRole(workspace string, workspaceRole *iamv1alpha2.WorkspaceRole) (*iamv1alpha2.WorkspaceRole, error)
CreateOrUpdateGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
PatchGlobalRole(globalRole *iamv1alpha2.GlobalRole) (*iamv1alpha2.GlobalRole, error)
DeleteWorkspaceRole(workspace string, name string) error
DeleteGlobalRole(name string) error
...
}
type amOperator struct {
globalRoleBindingGetter resourcev1alpha3.Interface
workspaceRoleBindingGetter resourcev1alpha3.Interface
clusterRoleBindingGetter resourcev1alpha3.Interface
roleBindingGetter resourcev1alpha3.Interface
globalRoleGetter resourcev1alpha3.Interface
workspaceRoleGetter resourcev1alpha3.Interface
clusterRoleGetter resourcev1alpha3.Interface
roleGetter resourcev1alpha3.Interface
devopsProjectLister devopslisters.DevOpsProjectLister
namespaceLister listersv1.NamespaceLister
ksclient kubesphere.Interface
k8sclient kubernetes.Interface
}
2.->pkg/apiserver/server.go
//注册与Authen和author相关的API
func (s *APIServer) installKubeSphereAPIs() {
//创建用户、角色和授权相关的资源操作对象
imOperator := im.NewOperator(s.KubernetesClient.KubeSphere(),
user.New(s.InformerFactory.KubeSphereSharedInformerFactory(),
s.InformerFactory.KubernetesSharedInformerFactory()),
loginrecord.New(s.InformerFactory.KubeSphereSharedInformerFactory()),
s.Config.AuthenticationOptions)
amOperator := am.NewOperator(s.KubernetesClient.KubeSphere(),
s.KubernetesClient.Kubernetes(),
s.InformerFactory)
rbacAuthorizer := rbac.NewRBACAuthorizer(amOperator)
urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
rbacAuthorizer))
//注册oauth相关API
urlruntime.Must(oauth.AddToContainer(s.container, imOperator,
auth.NewTokenOperator(
s.CacheClient,
s.Config.AuthenticationOptions),
auth.NewPasswordAuthenticator(
s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions),
auth.NewOAuthAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory(),
s.Config.AuthenticationOptions),
auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister()),
s.Config.AuthenticationOptions))
//注册租户(workspace)相关API
urlruntime.Must(tenantv1alpha2.AddToContainer(s.container, s.InformerFactory, s.KubernetesClient.Kubernetes(),
s.KubernetesClient.KubeSphere(), s.EventsClient, s.LoggingClient, s.AuditingClient, amOperator, rbacAuthorizer, s.MonitoringClient, s.RuntimeCache, s.Config.MeteringOptions))
//
urlruntime.Must(iamapi.AddToContainer(s.container, imOperator, amOperator,
group.New(s.InformerFactory, s.KubernetesClient.KubeSphere(), s.KubernetesClient.Kubernetes()),
rbacAuthorizer))
}
3.->pkg/apiserver/server.go //处理链中添加认证授权和准入相关的处理器
func (s *APIServer) buildHandlerChain(stopCh <-chan struct{}) {
var authorizers authorizer.Authorizer
//根据配置实例化授权器
switch s.Config.AuthorizationOptions.Mode {
case authorizationoptions.AlwaysAllow:
authorizers = authorizerfactory.NewAlwaysAllowAuthorizer()
case authorizationoptions.AlwaysDeny:
authorizers = authorizerfactory.NewAlwaysDenyAuthorizer()
default:
fallthrough
case authorizationoptions.RBAC://一般采用该模式
//排除以下API不进行授权验证
excludedPaths := []string{"/oauth/*", "/kapis/config.kubesphere.io/*", "/kapis/version", "/kapis/metrics"}
pathAuthorizer, _ := path.NewAuthorizer(excludedPaths)
amOperator := am.NewReadOnlyOperator(s.InformerFactory)
//联合,放一个数组里面
authorizers = unionauthorizer.New(pathAuthorizer, rbac.NewRBACAuthorizer(amOperator))
}
//授权验证
handler = filters.WithAuthorization(handler, authorizers)
if s.Config.MultiClusterOptions.Enable {
clusterDispatcher := dispatch.NewClusterDispatch(s.InformerFactory.KubeSphereSharedInformerFactory().Cluster().V1alpha1().Clusters())
handler = filters.WithMultipleClusterDispatcher(handler, clusterDispatcher)
}
//创建登录记录器,记录登录信息
loginRecorder := auth.NewLoginRecorder(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())
// authenticators are unordered
//把支持的登录验证器加到数组
//BasicAuthenticator,用于用户名密码验证,
//TokenAuthenticator, jwtToken验证
authn := unionauth.New(anonymous.NewAuthenticator(),
basictoken.New(basic.NewBasicAuthenticator(auth.NewPasswordAuthenticator(s.KubernetesClient.KubeSphere(),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister(),
s.Config.AuthenticationOptions), loginRecorder)),
bearertoken.New(jwttoken.NewTokenAuthenticator(auth.NewTokenOperator(s.CacheClient,
s.Config.AuthenticationOptions),
s.InformerFactory.KubeSphereSharedInformerFactory().Iam().V1alpha2().Users().Lister())))
//加入到过滤链中
handler = filters.WithAuthentication(handler, authn)
}
4.->pkg/controller/user/user_controller.go reconcile中实现了Etcd中用户信息变化后,controller处理相关的所有逻辑,比如用户删除、用户修改密码、同步OpenLdap等
func (c *userController) reconcile(key string) error {
// Get the user with this name
user, err := c.userLister.Get(key)
if err != nil {
// The user may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("user '%s' in work queue no longer exists", key))
return nil
}
klog.Error(err)
return err
}
//用户没有删除,加入Finalizers,更新用户状态
if user.ObjectMeta.DeletionTimestamp.IsZero() {
// The object is not being deleted, so if it does not have our finalizer,
// then lets add the finalizer and update the object.
if !sliceutil.HasString(user.Finalizers, finalizer) {
user.ObjectMeta.Finalizers = append(user.ObjectMeta.Finalizers, finalizer)
if user, err = c.ksClient.IamV1alpha2().Users().Update(context.Background(), user, metav1.UpdateOptions{}); err != nil {
klog.Error(err)
return err
}
}
} else {
// The object is being deleted
if sliceutil.HasString(user.ObjectMeta.Finalizers, finalizer) {
// we do not need to delete the user from ldapServer when ldapClient is nil
// 如果用户已经删除,则从ldap删除,并发布事件
if c.ldapClient != nil {
if err = c.waitForDeleteFromLDAP(key); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
//删除角色绑定并发布事件
if err = c.deleteRoleBindings(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
//删除组绑定并发布事件
if err = c.deleteGroupBindings(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
//删除DevOps角色绑定并发布事件
if c.devopsClient != nil {
// unassign jenkins role, unassign multiple times is allowed
if err = c.waitForUnassignDevOpsAdminRole(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
//删除登录记录并发布事件
if err = c.deleteLoginRecords(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
// remove our finalizer from the list and update it.
user.Finalizers = sliceutil.RemoveString(user.ObjectMeta.Finalizers, func(item string) bool {
return item == finalizer
})
// 更新用户状态
if user, err = c.ksClient.IamV1alpha2().Users().Update(context.Background(), user, metav1.UpdateOptions{}); err != nil {
klog.Error(err)
return err
}
}
// Our finalizer has finished, so the reconciler can do nothing.
return nil
}
// we do not need to sync ldap info when ldapClient is nil
// 用户信息同步给OpenLdap
if c.ldapClient != nil {
// ignore errors if timeout
if err = c.waitForSyncToLDAP(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
//密码加密
if user, err = c.encryptPassword(user); err != nil {
klog.Error(err)
return err
}
//同步用户状态
if user, err = c.syncUserStatus(user); err != nil {
klog.Error(err)
return err
}
if c.kubeconfig != nil {
// ensure user kubeconfig configmap is created
if err = c.kubeconfig.CreateKubeConfig(user); err != nil {
klog.Error(err)
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
}
if c.devopsClient != nil {
// assign jenkins role after user create, assign multiple times is allowed
// used as logged-in users can do anything
if err = c.waitForAssignDevOpsAdminRole(user); err != nil {
// ignore timeout error
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
}
}
// synchronization through kubefed-controller when multi cluster is enabled
if c.multiClusterEnabled {
if err = c.multiClusterSync(user); err != nil {
c.recorder.Event(user, corev1.EventTypeWarning, controller.FailedSynced, fmt.Sprintf(syncFailMessage, err))
return err
}
}
c.recorder.Event(user, corev1.EventTypeNormal, successSynced, messageResourceSynced)
return nil
}