searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Kubernetes与etcd:获取CRD资源的最佳实践

2024-06-17 09:26:25
21
0

为什么要讨论kubernetes与etcd?

在与msap、服务治理进行接口联调中,我们遇到分页获取k8s资源问题。

  1. list请求什么情况下会打穿到etcd?

  2. list请求只有limit和Continue参数,Continue控制分页。Continue是什么?

  3. 我们该用什么方法来进行k8s资源的拉取和分页呢?

为了规范日后服务网格的拉取资源的方式,得搞清楚k8s与etcd。

 

什么是etcd?

etcd 是一个开源的分布式键值存储数据库,它提供了简单且可靠的分布式数据存储解决方案。etcd 使用 Raft 一致性算法来确保数据的一致性和可靠性,并且支持事务操作。

etcd如何读取数据?

etcd提供MVCC读,读过程如下所示:

  1. etcd 首先根据用户请求的key,从TreeIndex中查找到key对应的keyIndex。

  2. 根据用户请求的reversion版本号,从keyIndex中找到小于等于reversion的generation。

    注:generation值{8,0}表示主版本号为8,事务内的子版本号为0。子版本号从0开始随事务内put/delete操作递增。

  3. 根据获取的中的主版本和子版本号序列化后获得boltDB种的key,根据key再到boltDB中读取数据。

 

 

etcd的数据结构

TreeIndex:采用b树结构。key是用户查询的key,对应的val是keyIndex结构。其中包含了该key的更新时间,创建时间以及历史版本号信息。

k8s中的etcd TreeIndex存储pod的key为/registry/pods/default/reviews-v2-5b667bcbf8-5zgwm,所以k8s的通用key格式是/registry/{resourceKind}/{namespace}/{resourceName}。

 

BoltDb:采用b+树结构,在这个结构中,键是主版本号+子版本号序列化后的key,而相应的值则是用户需要的数据值。这种设计使得用户能够根据特定版本号检索相应的数据。

 

 

K8s如何与etcd交互?

K8s架构

  1. kube-apiserver:负责对外提供集群各类资源的增删改查及 Watch 接口,它是 Kubernetes 集群中各组件数据交互和通信的枢纽。kube-apiserver 在设计上可水平扩展,高可用 Kubernetes 集群中一般多副本部署。当收到一个创建 Pod 写请求时,它的基本流程是对请求进行认证、限速、授权、准入机制等检查后,写入到 etcd 即可。

  2. kube-scheduler:是调度器组件,负责集群 Pod 的调度。基本原理是通过监听 kube-apiserver 获取待调度的 Pod,然后基于一系列筛选和评优算法,为 Pod 分配最佳的 Node 节点。

  3. kube-controller-manager :包含一系列的控制器组件,比如 Deployment、StatefulSet 等控制器。控制器的核心思想是监听、比较资源实际状态与期望状态是否一致,若不一致则进行协调工作使其最终一致。

  4. etcd 组件:Kubernetes 的元数据存储。

kube-apiserver 是唯一直接与 etcd 打交道的组件,各组件都通过 kube-apiserver 实现数据交互,它们极度依赖 kube-apiserver 提供的资源变化监听机制

 

apiserver如何处理list请求?

让我们先看看apiserver中对于list请求中涉及到的类

  • Cacher:缓存请求处理类

    • Cacher类会根据请求的参数来判断从缓存中处理数据还是直接从etcd中处理数据。

  • Store:Etcd客户端类

    • 当Cacher决定从etcd读取数据时,则会调用Store类的方法。

    • Store类根据list中的Continue、Limit以及selector等参数控制实现etcd的访问逻辑。

  • WatchCache:Cacher缓存类

    • 当Cacher决定从缓存中读取数据时,则会调用WatchCache的方法。

    • Cacher类watch etcd的资源,并更新自己内部的threadSafeStore。

 

Cacher的List方法

Cacher类的List方法,主要实现了不同参数值情况下使用缓存还是访问etcd的逻辑。

源码

func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
 pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
 //要求分页并且有continue字段,毕竟只有etcd支持continue
 hasContinuation := pagingEnabled && len(pred.Continue) > 0
 //要求分页,还有limit,并且resourceVersion不为0。
 //todo 为0的时候会发生什么呢?limits会被忽略
 hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
 if resourceVersion == "" || hasContinuation || hasLimit {
  return c.storage.List(ctx, key, resourceVersion, pred, listObj)
 }

 //resourceVersion转为etcd的version
 listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
 if err != nil {
  return err
 }
 //检查版本号为0和cache没有ready
 if listRV == 0 && !c.ready.check() {
  // If Cacher is not yet initialized and we don't require any specific
  // minimal resource version, simply forward the request to storage.
  return c.storage.List(ctx, key, resourceVersion, pred, listObj)
 }

 trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
 defer trace.LogIfLong(500 * time.Millisecond)
 //为什么上面已经ready.check了,这里还是要wait呢?不是一个条件
 c.ready.wait()
 trace.Step("Ready")

 // List elements with at least 'listRV' from cache.
 listPtr, err := meta.GetItemsPtr(listObj)
 if err != nil {
  return err
 }
 listVal, err := conversion.EnforcePtr(listPtr)
 if err != nil || listVal.Kind() != reflect.Slice {
  return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
 }
 //过滤函数,以key和selection函数
 filter := filterWithAttrsFunction(key, pred)
 //取出threadSafeMap里面的所有的数据,然后过滤 objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
 if err != nil {
  return err
 }
 trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
 if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
  // Resize the slice appropriately, since we already know that none
  // of the elements will be filtered out.
  listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
  trace.Step("Resized result")
 }
 for _, obj := range objs {
  elem, ok := obj.(*storeElement)
  if !ok {
   return fmt.Errorf("non *storeElement returned from storage: %v", obj)
  }
  //进行过滤
  if filter(elem.Key, elem.Labels, elem.Fields) {
   listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
  }
 }
 trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
 if c.versioner != nil {
  if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
   return err
  }
 }
 return nil
}

伪代码

1.//List代码
2.func (c *Cacher) List( )  {
3.    if版本号为空||有Continue字段||有limit并且 版本号不为0{
4.      访问store.List(),返回
5.    }
6.    if版本号为0,并且缓存没有准备好{
7.      访问store.List(),返回
8.     }
9.    根据pred中的label selector和filed selector创建过滤函数filter
10.    等待缓存中的版本号大于等于reversion,并拉取全量数据
11.     for 缓存数据 {
12.      如果通过filter过滤函数则添加到结果集中
13.    }
14.  返回结果
15.}

总结

根据上述代码总结出使用etcd和缓存的情况有以下几种

  • 使用etcd查询的情况

    • resourceVersion为空

    • 存在Continue字段

    • 存在limit字段,且resourceVersion!=“0”

    • resourceVersion为0,但是watchCache没有准备好

  • 使用cacher查询的情况(continue以及limit字段无效)

    • 只有resourceVersion

    • 有limit字段,resourceVersion=0。直接忽略limit

Store的List方法

Store类的List方法,主要实现List请求中Continue字段的解析、selector过滤逻辑以及limit的实现。

源码

func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
 trace := utiltrace.New("List etcd3",
  utiltrace.Field{"key", key},
  utiltrace.Field{"resourceVersion", resourceVersion},
  utiltrace.Field{"limit", pred.Limit},
  utiltrace.Field{"continue", pred.Continue})
 defer trace.LogIfLong(500 * time.Millisecond)
 listPtr, err := meta.GetItemsPtr(listObj)
 if err != nil {
  return err
 }
 v, err := conversion.EnforcePtr(listPtr)
 if err != nil || v.Kind() != reflect.Slice {
  return fmt.Errorf("need ptr to slice: %v", err)
 }

 if s.pathPrefix != "" {
  key = path.Join(s.pathPrefix, key)
 }
 // We need to make sure the key ended with "/" so that we only get children "directories".
 // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
 // while with prefix "/a/" will return only "/a/b" which is the correct answer.
 if !strings.HasSuffix(key, "/") {
  key += "/"
 }
 keyPrefix := key

 // set the appropriate clientv3 options to filter the returned data set
 var paging bool
 options := make([]clientv3.OpOption, 0, 4)
 //设置limit的限制
 if s.pagingEnabled && pred.Limit > 0 {
  paging = true
  options = append(options, clientv3.WithLimit(pred.Limit))
 }

 newItemFunc := getNewItemFunc(listObj, v)

 var returnedRV, continueRV, withRev int64
 var continueKey string
 switch {
 //分页并且有continue的字段
 //type continueToken struct {
 //   APIVersion      string `json:"v"`
 //   ResourceVersion int64  `json:"rv"`
        //   StartKey        string `json:"start"`
        //  }
 case s.pagingEnabled && len(pred.Continue) > 0:
  //key对应的应该是资源的名称,在treeIndex中的
  continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
  if err != nil {
   return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
  }

  if len(resourceVersion) > 0 && resourceVersion != "0" {
   return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
  }
  //**也就是在字符串最后的位置加上1,从而实现[ContinueKey,keyPrefix+1)的范围查找**//
  //eg: keyprefix为/registry/pods/default,keyprefix+1位/registry/pods/defaulu
  rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
  options = append(options, clientv3.WithRange(rangeEnd))
  
  //这里的key是具体某个节点的key,比/registry/pods/default/reviewxxxx1
  key = continueKey

  //continueRV,在boltdb可直接找到数据
  // If continueRV > 0, the LIST request needs a specific resource version.
  // continueRV==0 is invalid.
  // If continueRV < 0, the request is for the latest resource version.
  if continueRV > 0 {
   withRev = continueRV
   returnedRV = continueRV
  }
 //分页,但是没有continue字段,有limit字段,可能是第一次查询?
 case s.pagingEnabled && pred.Limit > 0:
  //指定resource
  if len(resourceVersion) > 0 {
   fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
   if err != nil {
    return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
   }
   if fromRV > 0 {
    withRev = int64(fromRV)
   }
   returnedRV = int64(fromRV)
  }
  //同样根据key获取range
  rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
  options = append(options, clientv3.WithRange(rangeEnd))

 default:
  if len(resourceVersion) > 0 {
   fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
   if err != nil {
    return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
   }
   if fromRV > 0 {
    withRev = int64(fromRV)
   }
   returnedRV = int64(fromRV)
  }
  //根据查询的key来获得key+1
  options = append(options, clientv3.WithPrefix())
 }
 if withRev != 0 {
  options = append(options, clientv3.WithRev(withRev))
 }

 // loop until we have filled the requested limit from etcd or there are no more results
 var lastKey []byte
 var hasMore bool
 var getResp *clientv3.GetResponse
 //
 for {
  startTime := time.Now()
  getResp, err = s.client.KV.Get(ctx, key, options...)
  metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
  if err != nil {
   return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
  }
  hasMore = getResp.More

  if len(getResp.Kvs) == 0 && getResp.More {
   return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
  }

  // avoid small allocations for the result slice, since this can be called in many
  // different contexts and we don't know how significantly the result will be filtered
  if pred.Empty() {
   growSlice(v, len(getResp.Kvs))
  } else {
   growSlice(v, 2048, len(getResp.Kvs))
  }

  //遍历etcd查询到的数据,并且通过filter过滤后插入到返回的数组中,会判断数组元素个数是否超过limit大小
  //todo 如果是有filter函数的时候,如何获取还有多少满足条件的item个数呢
  for _, kv := range getResp.Kvs {
   //如果分页,并且v中的已有元素大于limit
   if paging && int64(v.Len()) >= pred.Limit {
    hasMore = true
    break
   }
   lastKey = kv.Key

   data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
   if err != nil {
    return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
   }
   //使用pred.match来过滤从etcd获取的元素
   if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
    return err
   }
  }

  // indicate to the client which resource version was returned
  if returnedRV == 0 {
   //这个revision表示的是获取大于等于revision的数据
   returnedRV = getResp.Header.Revision
  }

  // no more results remain or we didn't request paging
  if !hasMore || !paging {
   break
  }
  // we're paging but we have filled our bucket
  if int64(v.Len()) >= pred.Limit {
   break
  }
  //对新的查询设置新的key
  key = string(lastKey) + "\\x00"
  if withRev == 0 {
   withRev = returnedRV
   options = append(options, clientv3.WithRev(withRev))
  }
 }

 // instruct the client to begin querying from immediately after the last key we returned
 // we never return a key that the client wouldn't be allowed to see
 if hasMore {
  // we want to start immediately after the last key
  //编写continue
  next, err := encodeContinue(string(lastKey)+"\\x00", keyPrefix, returnedRV)
  if err != nil {
   return err
  }
  var remainingItemCount *int64
  // 只有当空的pred时,才会返回保留的数
  //如果pred中label和Field筛选都是为空才会返回后面的数量。
  if utilfeature.DefaultFeatureGate.Enabled(features.RemainingItemCount) {
   if pred.Empty() {
    c := int64(getResp.Count - pred.Limit)
    remainingItemCount = &c
   }
  }
  return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount)
 }

 // no continuation
 return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil)
}

Continue字段

Continue字段是实现滚动分页的关键参数,apiserver将continue结构体序列化后作为字段传递给用户。用户传入这个Continue字段,开启下一页的查询。

 

伪代码

// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
 根据limit参数,设置etcd查询时的limit opts
 switch {
 case 存在Continue字段:
     解析Continue字段,获取Continue结构体
//type continueToken struct {
//   APIVersion      string `json:"v"`
//   ResourceVersion int64  `json:"rv"`
//   StartKey        string `json:"start"`
//  }
     根据StartKey和ResourceVersion,设置etcd查询时key为StartKey,版本号为ResourceVersion。
     设置[StartKey,keyPrefix+1)范围查找
     //eg: keyprefix为用户查询的key:/registry/pods/default,keyprefix+1位/registry/pods/defaulu
 case 参数中有limit
     设置查询版本号
     设置opts的WithRange参数
 default:
     设置查询版本号
     设置opts的WithPrefix
 }
 //循环处理,直到填满limit或没有更多结果
 for {
     访问etcd获取数据
     for etcd查询数据 {
         if 如果分页,并且结果集中的已有元素大于limit{
            break
         }
          进行filter过滤,插入结果集中
     }
     if etcd没有更多数据 || 没有分页需求 {
        break
     }
     if 满足limit大小 {
        break
     }
     设置新的查询key,进行新一轮查询
 }
 if 还有数据 {
    编写continue
    if pred中label和Field筛选都是为空{
       设置remainingItemCount字段
    }
 }
}

总结

  • store类借助Continue解析出startKey和reversion。可以快速通过etcd的索引树从上次遍历结束的节点开始新的一轮遍历。

  • store类获取的列表只能按照名称的字典序从小到大排序,无法实现按照创建时间等的排序

  • store类使用[startKey,key+1)的范围搜索,这可以获取命名空间下面的所有pod。如["/pods/default","/pods/defaulu")。

  • 当获取列表的请求中包含label以及filed selector时采用循环读取后过滤的方法,直到满足limit大小。

  • remainingItemCount只有在没有设置label 以及filed selector参数下才会返回剩余的元素个数。

实验与总结

为了验证解读代码的结论,需要设计实验来验证。

实验1

实验条件

使用k8s client go的list接口,

 pods, _ := clientset.CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{
  ResourceVersion: "0",
 })

default环境中设置deployment的pod控制数为1000。生成1000个pod,以大小为10获取分页。分为两个实验

  1. resourceVersion是否为0的时候,来确认分别从etcd和缓存获取数据的时间。

  2. 使用limit和continue参数,验证采用内存分页和滚动分页的性能差别。

实验结果

 

  resourceVersion limit字段 Continue字段 实验结果 实验说明
实验1 0(访问apiserver缓存) 0.188687549 s  
实验2 不传入(访问etcd) 0.189417412 s 与实验1对比,在集群中crd资源较多的情况下。访问etcd获取数据方式更快
实验3 不传入(访问etcd) 10 0.035951045 s 与实验1与2对比,在从etcd中获取limit大小数据比从缓存或etcd获取全量数据更快。
实验4 不传入(访问etcd) 10 传入 0.030137118 s 与实验1与2对比,在使用limit和continue字段分页获取数据比从缓存或etcd获取全量数据更快。

 

实验2

在看源码的是否发现cacher使用的缓存为threadSafeMap,将所有的api资源都存放在里面。并且获取列表的时候直接遍历缓存中的所有元素,进行过滤。因此如果集群内的api资源足够多的时候,这种遍历的耗时可能是比etcd访问更多的。为了验证这个猜想设计了下面的实验。

实验条件

使用k8s client go的list接口,

 pods, _ := clientset.CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{
  ResourceVersion: "0",
 })

default环境中设置deployment的控制数为3000。生成3000个pod,以大小为10获取分页。另一个命名空间生成1000个pod。应该分为两个实验

  1. resourceVersion是否为0的时候,来确认分别从etcd和缓存获取数据的时间。

  2. 使用limit和continue参数,验证采用内存分页和滚动分页的性能差别。

实验结果

  resourceVersion limit字段 Continue字段 实验结果 实验说明
实验1 0(访问apiserver缓存) 0.563518417 s  
实验2 不传入(访问etcd) 0.521672632 s 与实验1对比,在集群中crd资源较少的情况下。访问缓存获取数据方式更快
实验3 不传入(访问etcd) 10 0.02927998 s 与实验1与2对比,在从etcd中获取limit大小数据比从缓存或etcd获取全量数据更快。
实验4 不传入(访问etcd) 10 传入 0.031426884 s 与实验1与2对比,在使用limit和continue字段分页获取数据比从缓存或etcd获取全量数据更快。

 

使用建议

为避免过多的请求打穿到etcd,导致etcd性能的下降。对目前为获取资源列表可能使用到的方法进行分析其弊端并提出改进建议。

指定resourcVersion

在使用 page 或 offset 进行交互时,通常的实现方式是在内存中拉取所有数据然后进行分页。如果在查询的时候没有指定resourcVersion请求将会略过apiserver的cacher,直接访问etcd获取数据。为避免大量请求打穿到etcd,应该在请求的时候设置resourcVersion为“0”。但是当

 

采用滚动分页

页码分页方式每次切换页码都会请求一次apiserver。这种情况下无论是使用cacher还是打穿到etcd都将耗费大量的时间。因此建议采用滚动分页,滚动分页利用Continue大大缩减了查询的时间。下图是服务网格使用滚动分页的交互界面。

 

 

采用缓存

infomer

Informer主要有两个作用:

  1. 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;

  2. 注册相应的事件,之后如果监听到的事件变化就会调用事件对应的EventHandler,实现回调。

 

 

我们建议采用k8s的informer机制来实现资源对象的缓存。informer机制会使用list和watch函数从etcd获取并监控资源,同时在内存中维护一个indexer,我们可以对这个indexer设置索引函数,比如按照命名空间建立索引缓存。informer机制还有事件通知机制,可以对资源对象的各类事件设置回调函数,进行资源外部存储的操作。

如果调用方位全国区的控制台需要考虑到多集群多api资源导致内存消耗过大问题。建议采用短时、小范围的资源缓存如redis。

如果调用方为集群内的,可以针对特定的API资源使用infromer机制建立缓存,并监听实时更新以保持缓存的有效性。建立缓存索引实现更快速地获取资源。可以参考各类controller实现的方式。

func (s podNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
    err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
    ret = append(ret, m.(*v1.Pod))
    })
    return ret, err
}

 

informer类图,仅供参考

 

0条评论
0 / 1000
2****m
3文章数
0粉丝数
2****m
3 文章 | 0 粉丝
原创

Kubernetes与etcd:获取CRD资源的最佳实践

2024-06-17 09:26:25
21
0

为什么要讨论kubernetes与etcd?

在与msap、服务治理进行接口联调中,我们遇到分页获取k8s资源问题。

  1. list请求什么情况下会打穿到etcd?

  2. list请求只有limit和Continue参数,Continue控制分页。Continue是什么?

  3. 我们该用什么方法来进行k8s资源的拉取和分页呢?

为了规范日后服务网格的拉取资源的方式,得搞清楚k8s与etcd。

 

什么是etcd?

etcd 是一个开源的分布式键值存储数据库,它提供了简单且可靠的分布式数据存储解决方案。etcd 使用 Raft 一致性算法来确保数据的一致性和可靠性,并且支持事务操作。

etcd如何读取数据?

etcd提供MVCC读,读过程如下所示:

  1. etcd 首先根据用户请求的key,从TreeIndex中查找到key对应的keyIndex。

  2. 根据用户请求的reversion版本号,从keyIndex中找到小于等于reversion的generation。

    注:generation值{8,0}表示主版本号为8,事务内的子版本号为0。子版本号从0开始随事务内put/delete操作递增。

  3. 根据获取的中的主版本和子版本号序列化后获得boltDB种的key,根据key再到boltDB中读取数据。

 

 

etcd的数据结构

TreeIndex:采用b树结构。key是用户查询的key,对应的val是keyIndex结构。其中包含了该key的更新时间,创建时间以及历史版本号信息。

k8s中的etcd TreeIndex存储pod的key为/registry/pods/default/reviews-v2-5b667bcbf8-5zgwm,所以k8s的通用key格式是/registry/{resourceKind}/{namespace}/{resourceName}。

 

BoltDb:采用b+树结构,在这个结构中,键是主版本号+子版本号序列化后的key,而相应的值则是用户需要的数据值。这种设计使得用户能够根据特定版本号检索相应的数据。

 

 

K8s如何与etcd交互?

K8s架构

  1. kube-apiserver:负责对外提供集群各类资源的增删改查及 Watch 接口,它是 Kubernetes 集群中各组件数据交互和通信的枢纽。kube-apiserver 在设计上可水平扩展,高可用 Kubernetes 集群中一般多副本部署。当收到一个创建 Pod 写请求时,它的基本流程是对请求进行认证、限速、授权、准入机制等检查后,写入到 etcd 即可。

  2. kube-scheduler:是调度器组件,负责集群 Pod 的调度。基本原理是通过监听 kube-apiserver 获取待调度的 Pod,然后基于一系列筛选和评优算法,为 Pod 分配最佳的 Node 节点。

  3. kube-controller-manager :包含一系列的控制器组件,比如 Deployment、StatefulSet 等控制器。控制器的核心思想是监听、比较资源实际状态与期望状态是否一致,若不一致则进行协调工作使其最终一致。

  4. etcd 组件:Kubernetes 的元数据存储。

kube-apiserver 是唯一直接与 etcd 打交道的组件,各组件都通过 kube-apiserver 实现数据交互,它们极度依赖 kube-apiserver 提供的资源变化监听机制

 

apiserver如何处理list请求?

让我们先看看apiserver中对于list请求中涉及到的类

  • Cacher:缓存请求处理类

    • Cacher类会根据请求的参数来判断从缓存中处理数据还是直接从etcd中处理数据。

  • Store:Etcd客户端类

    • 当Cacher决定从etcd读取数据时,则会调用Store类的方法。

    • Store类根据list中的Continue、Limit以及selector等参数控制实现etcd的访问逻辑。

  • WatchCache:Cacher缓存类

    • 当Cacher决定从缓存中读取数据时,则会调用WatchCache的方法。

    • Cacher类watch etcd的资源,并更新自己内部的threadSafeStore。

 

Cacher的List方法

Cacher类的List方法,主要实现了不同参数值情况下使用缓存还是访问etcd的逻辑。

源码

func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
 pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
 //要求分页并且有continue字段,毕竟只有etcd支持continue
 hasContinuation := pagingEnabled && len(pred.Continue) > 0
 //要求分页,还有limit,并且resourceVersion不为0。
 //todo 为0的时候会发生什么呢?limits会被忽略
 hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
 if resourceVersion == "" || hasContinuation || hasLimit {
  return c.storage.List(ctx, key, resourceVersion, pred, listObj)
 }

 //resourceVersion转为etcd的version
 listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
 if err != nil {
  return err
 }
 //检查版本号为0和cache没有ready
 if listRV == 0 && !c.ready.check() {
  // If Cacher is not yet initialized and we don't require any specific
  // minimal resource version, simply forward the request to storage.
  return c.storage.List(ctx, key, resourceVersion, pred, listObj)
 }

 trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
 defer trace.LogIfLong(500 * time.Millisecond)
 //为什么上面已经ready.check了,这里还是要wait呢?不是一个条件
 c.ready.wait()
 trace.Step("Ready")

 // List elements with at least 'listRV' from cache.
 listPtr, err := meta.GetItemsPtr(listObj)
 if err != nil {
  return err
 }
 listVal, err := conversion.EnforcePtr(listPtr)
 if err != nil || listVal.Kind() != reflect.Slice {
  return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
 }
 //过滤函数,以key和selection函数
 filter := filterWithAttrsFunction(key, pred)
 //取出threadSafeMap里面的所有的数据,然后过滤 objs, readResourceVersion, err := c.watchCache.WaitUntilFreshAndList(listRV, trace)
 if err != nil {
  return err
 }
 trace.Step("Listed items from cache", utiltrace.Field{"count", len(objs)})
 if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
  // Resize the slice appropriately, since we already know that none
  // of the elements will be filtered out.
  listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
  trace.Step("Resized result")
 }
 for _, obj := range objs {
  elem, ok := obj.(*storeElement)
  if !ok {
   return fmt.Errorf("non *storeElement returned from storage: %v", obj)
  }
  //进行过滤
  if filter(elem.Key, elem.Labels, elem.Fields) {
   listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
  }
 }
 trace.Step("Filtered items", utiltrace.Field{"count", listVal.Len()})
 if c.versioner != nil {
  if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
   return err
  }
 }
 return nil
}

伪代码

1.//List代码
2.func (c *Cacher) List( )  {
3.    if版本号为空||有Continue字段||有limit并且 版本号不为0{
4.      访问store.List(),返回
5.    }
6.    if版本号为0,并且缓存没有准备好{
7.      访问store.List(),返回
8.     }
9.    根据pred中的label selector和filed selector创建过滤函数filter
10.    等待缓存中的版本号大于等于reversion,并拉取全量数据
11.     for 缓存数据 {
12.      如果通过filter过滤函数则添加到结果集中
13.    }
14.  返回结果
15.}

总结

根据上述代码总结出使用etcd和缓存的情况有以下几种

  • 使用etcd查询的情况

    • resourceVersion为空

    • 存在Continue字段

    • 存在limit字段,且resourceVersion!=“0”

    • resourceVersion为0,但是watchCache没有准备好

  • 使用cacher查询的情况(continue以及limit字段无效)

    • 只有resourceVersion

    • 有limit字段,resourceVersion=0。直接忽略limit

Store的List方法

Store类的List方法,主要实现List请求中Continue字段的解析、selector过滤逻辑以及limit的实现。

源码

func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
 trace := utiltrace.New("List etcd3",
  utiltrace.Field{"key", key},
  utiltrace.Field{"resourceVersion", resourceVersion},
  utiltrace.Field{"limit", pred.Limit},
  utiltrace.Field{"continue", pred.Continue})
 defer trace.LogIfLong(500 * time.Millisecond)
 listPtr, err := meta.GetItemsPtr(listObj)
 if err != nil {
  return err
 }
 v, err := conversion.EnforcePtr(listPtr)
 if err != nil || v.Kind() != reflect.Slice {
  return fmt.Errorf("need ptr to slice: %v", err)
 }

 if s.pathPrefix != "" {
  key = path.Join(s.pathPrefix, key)
 }
 // We need to make sure the key ended with "/" so that we only get children "directories".
 // e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
 // while with prefix "/a/" will return only "/a/b" which is the correct answer.
 if !strings.HasSuffix(key, "/") {
  key += "/"
 }
 keyPrefix := key

 // set the appropriate clientv3 options to filter the returned data set
 var paging bool
 options := make([]clientv3.OpOption, 0, 4)
 //设置limit的限制
 if s.pagingEnabled && pred.Limit > 0 {
  paging = true
  options = append(options, clientv3.WithLimit(pred.Limit))
 }

 newItemFunc := getNewItemFunc(listObj, v)

 var returnedRV, continueRV, withRev int64
 var continueKey string
 switch {
 //分页并且有continue的字段
 //type continueToken struct {
 //   APIVersion      string `json:"v"`
 //   ResourceVersion int64  `json:"rv"`
        //   StartKey        string `json:"start"`
        //  }
 case s.pagingEnabled && len(pred.Continue) > 0:
  //key对应的应该是资源的名称,在treeIndex中的
  continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
  if err != nil {
   return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
  }

  if len(resourceVersion) > 0 && resourceVersion != "0" {
   return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
  }
  //**也就是在字符串最后的位置加上1,从而实现[ContinueKey,keyPrefix+1)的范围查找**//
  //eg: keyprefix为/registry/pods/default,keyprefix+1位/registry/pods/defaulu
  rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
  options = append(options, clientv3.WithRange(rangeEnd))
  
  //这里的key是具体某个节点的key,比/registry/pods/default/reviewxxxx1
  key = continueKey

  //continueRV,在boltdb可直接找到数据
  // If continueRV > 0, the LIST request needs a specific resource version.
  // continueRV==0 is invalid.
  // If continueRV < 0, the request is for the latest resource version.
  if continueRV > 0 {
   withRev = continueRV
   returnedRV = continueRV
  }
 //分页,但是没有continue字段,有limit字段,可能是第一次查询?
 case s.pagingEnabled && pred.Limit > 0:
  //指定resource
  if len(resourceVersion) > 0 {
   fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
   if err != nil {
    return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
   }
   if fromRV > 0 {
    withRev = int64(fromRV)
   }
   returnedRV = int64(fromRV)
  }
  //同样根据key获取range
  rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
  options = append(options, clientv3.WithRange(rangeEnd))

 default:
  if len(resourceVersion) > 0 {
   fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
   if err != nil {
    return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
   }
   if fromRV > 0 {
    withRev = int64(fromRV)
   }
   returnedRV = int64(fromRV)
  }
  //根据查询的key来获得key+1
  options = append(options, clientv3.WithPrefix())
 }
 if withRev != 0 {
  options = append(options, clientv3.WithRev(withRev))
 }

 // loop until we have filled the requested limit from etcd or there are no more results
 var lastKey []byte
 var hasMore bool
 var getResp *clientv3.GetResponse
 //
 for {
  startTime := time.Now()
  getResp, err = s.client.KV.Get(ctx, key, options...)
  metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
  if err != nil {
   return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
  }
  hasMore = getResp.More

  if len(getResp.Kvs) == 0 && getResp.More {
   return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
  }

  // avoid small allocations for the result slice, since this can be called in many
  // different contexts and we don't know how significantly the result will be filtered
  if pred.Empty() {
   growSlice(v, len(getResp.Kvs))
  } else {
   growSlice(v, 2048, len(getResp.Kvs))
  }

  //遍历etcd查询到的数据,并且通过filter过滤后插入到返回的数组中,会判断数组元素个数是否超过limit大小
  //todo 如果是有filter函数的时候,如何获取还有多少满足条件的item个数呢
  for _, kv := range getResp.Kvs {
   //如果分页,并且v中的已有元素大于limit
   if paging && int64(v.Len()) >= pred.Limit {
    hasMore = true
    break
   }
   lastKey = kv.Key

   data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
   if err != nil {
    return storage.NewInternalErrorf("unable to transform key %q: %v", kv.Key, err)
   }
   //使用pred.match来过滤从etcd获取的元素
   if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
    return err
   }
  }

  // indicate to the client which resource version was returned
  if returnedRV == 0 {
   //这个revision表示的是获取大于等于revision的数据
   returnedRV = getResp.Header.Revision
  }

  // no more results remain or we didn't request paging
  if !hasMore || !paging {
   break
  }
  // we're paging but we have filled our bucket
  if int64(v.Len()) >= pred.Limit {
   break
  }
  //对新的查询设置新的key
  key = string(lastKey) + "\\x00"
  if withRev == 0 {
   withRev = returnedRV
   options = append(options, clientv3.WithRev(withRev))
  }
 }

 // instruct the client to begin querying from immediately after the last key we returned
 // we never return a key that the client wouldn't be allowed to see
 if hasMore {
  // we want to start immediately after the last key
  //编写continue
  next, err := encodeContinue(string(lastKey)+"\\x00", keyPrefix, returnedRV)
  if err != nil {
   return err
  }
  var remainingItemCount *int64
  // 只有当空的pred时,才会返回保留的数
  //如果pred中label和Field筛选都是为空才会返回后面的数量。
  if utilfeature.DefaultFeatureGate.Enabled(features.RemainingItemCount) {
   if pred.Empty() {
    c := int64(getResp.Count - pred.Limit)
    remainingItemCount = &c
   }
  }
  return s.versioner.UpdateList(listObj, uint64(returnedRV), next, remainingItemCount)
 }

 // no continuation
 return s.versioner.UpdateList(listObj, uint64(returnedRV), "", nil)
}

Continue字段

Continue字段是实现滚动分页的关键参数,apiserver将continue结构体序列化后作为字段传递给用户。用户传入这个Continue字段,开启下一页的查询。

 

伪代码

// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
 根据limit参数,设置etcd查询时的limit opts
 switch {
 case 存在Continue字段:
     解析Continue字段,获取Continue结构体
//type continueToken struct {
//   APIVersion      string `json:"v"`
//   ResourceVersion int64  `json:"rv"`
//   StartKey        string `json:"start"`
//  }
     根据StartKey和ResourceVersion,设置etcd查询时key为StartKey,版本号为ResourceVersion。
     设置[StartKey,keyPrefix+1)范围查找
     //eg: keyprefix为用户查询的key:/registry/pods/default,keyprefix+1位/registry/pods/defaulu
 case 参数中有limit
     设置查询版本号
     设置opts的WithRange参数
 default:
     设置查询版本号
     设置opts的WithPrefix
 }
 //循环处理,直到填满limit或没有更多结果
 for {
     访问etcd获取数据
     for etcd查询数据 {
         if 如果分页,并且结果集中的已有元素大于limit{
            break
         }
          进行filter过滤,插入结果集中
     }
     if etcd没有更多数据 || 没有分页需求 {
        break
     }
     if 满足limit大小 {
        break
     }
     设置新的查询key,进行新一轮查询
 }
 if 还有数据 {
    编写continue
    if pred中label和Field筛选都是为空{
       设置remainingItemCount字段
    }
 }
}

总结

  • store类借助Continue解析出startKey和reversion。可以快速通过etcd的索引树从上次遍历结束的节点开始新的一轮遍历。

  • store类获取的列表只能按照名称的字典序从小到大排序,无法实现按照创建时间等的排序

  • store类使用[startKey,key+1)的范围搜索,这可以获取命名空间下面的所有pod。如["/pods/default","/pods/defaulu")。

  • 当获取列表的请求中包含label以及filed selector时采用循环读取后过滤的方法,直到满足limit大小。

  • remainingItemCount只有在没有设置label 以及filed selector参数下才会返回剩余的元素个数。

实验与总结

为了验证解读代码的结论,需要设计实验来验证。

实验1

实验条件

使用k8s client go的list接口,

 pods, _ := clientset.CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{
  ResourceVersion: "0",
 })

default环境中设置deployment的pod控制数为1000。生成1000个pod,以大小为10获取分页。分为两个实验

  1. resourceVersion是否为0的时候,来确认分别从etcd和缓存获取数据的时间。

  2. 使用limit和continue参数,验证采用内存分页和滚动分页的性能差别。

实验结果

 

  resourceVersion limit字段 Continue字段 实验结果 实验说明
实验1 0(访问apiserver缓存) 0.188687549 s  
实验2 不传入(访问etcd) 0.189417412 s 与实验1对比,在集群中crd资源较多的情况下。访问etcd获取数据方式更快
实验3 不传入(访问etcd) 10 0.035951045 s 与实验1与2对比,在从etcd中获取limit大小数据比从缓存或etcd获取全量数据更快。
实验4 不传入(访问etcd) 10 传入 0.030137118 s 与实验1与2对比,在使用limit和continue字段分页获取数据比从缓存或etcd获取全量数据更快。

 

实验2

在看源码的是否发现cacher使用的缓存为threadSafeMap,将所有的api资源都存放在里面。并且获取列表的时候直接遍历缓存中的所有元素,进行过滤。因此如果集群内的api资源足够多的时候,这种遍历的耗时可能是比etcd访问更多的。为了验证这个猜想设计了下面的实验。

实验条件

使用k8s client go的list接口,

 pods, _ := clientset.CoreV1().Pods("default").List(context.TODO(), v1.ListOptions{
  ResourceVersion: "0",
 })

default环境中设置deployment的控制数为3000。生成3000个pod,以大小为10获取分页。另一个命名空间生成1000个pod。应该分为两个实验

  1. resourceVersion是否为0的时候,来确认分别从etcd和缓存获取数据的时间。

  2. 使用limit和continue参数,验证采用内存分页和滚动分页的性能差别。

实验结果

  resourceVersion limit字段 Continue字段 实验结果 实验说明
实验1 0(访问apiserver缓存) 0.563518417 s  
实验2 不传入(访问etcd) 0.521672632 s 与实验1对比,在集群中crd资源较少的情况下。访问缓存获取数据方式更快
实验3 不传入(访问etcd) 10 0.02927998 s 与实验1与2对比,在从etcd中获取limit大小数据比从缓存或etcd获取全量数据更快。
实验4 不传入(访问etcd) 10 传入 0.031426884 s 与实验1与2对比,在使用limit和continue字段分页获取数据比从缓存或etcd获取全量数据更快。

 

使用建议

为避免过多的请求打穿到etcd,导致etcd性能的下降。对目前为获取资源列表可能使用到的方法进行分析其弊端并提出改进建议。

指定resourcVersion

在使用 page 或 offset 进行交互时,通常的实现方式是在内存中拉取所有数据然后进行分页。如果在查询的时候没有指定resourcVersion请求将会略过apiserver的cacher,直接访问etcd获取数据。为避免大量请求打穿到etcd,应该在请求的时候设置resourcVersion为“0”。但是当

 

采用滚动分页

页码分页方式每次切换页码都会请求一次apiserver。这种情况下无论是使用cacher还是打穿到etcd都将耗费大量的时间。因此建议采用滚动分页,滚动分页利用Continue大大缩减了查询的时间。下图是服务网格使用滚动分页的交互界面。

 

 

采用缓存

infomer

Informer主要有两个作用:

  1. 通过一种叫作 ListAndWatch 的方法,把 APIServer 中的 API 对象缓存在了本地,并负责更新和维护这个缓存。ListAndWatch通过 APIServer 的 LIST API“获取”所有最新版本的 API 对象;然后,再通过 WATCH API 来“监听”所有这些 API 对象的变化;

  2. 注册相应的事件,之后如果监听到的事件变化就会调用事件对应的EventHandler,实现回调。

 

 

我们建议采用k8s的informer机制来实现资源对象的缓存。informer机制会使用list和watch函数从etcd获取并监控资源,同时在内存中维护一个indexer,我们可以对这个indexer设置索引函数,比如按照命名空间建立索引缓存。informer机制还有事件通知机制,可以对资源对象的各类事件设置回调函数,进行资源外部存储的操作。

如果调用方位全国区的控制台需要考虑到多集群多api资源导致内存消耗过大问题。建议采用短时、小范围的资源缓存如redis。

如果调用方为集群内的,可以针对特定的API资源使用infromer机制建立缓存,并监听实时更新以保持缓存的有效性。建立缓存索引实现更快速地获取资源。可以参考各类controller实现的方式。

func (s podNamespaceLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
    err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
    ret = append(ret, m.(*v1.Pod))
    })
    return ret, err
}

 

informer类图,仅供参考

 

文章来自个人专栏
服务网格istio
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0