searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

client-go源码分析

2023-04-24 13:05:32
29
0

运行图

下图展示client-go中各组件如何运行,以及它与Custom Controller的交互点。

源码结构与简要剖析

基于release-1.22。

mac:client-go caiyueyang$ tree . -L 1

.
├── CHANGELOG.md
├── CONTRIBUTING.md
├── INSTALL.md
├── LICENSE
├── OWNERS
├── README.md
├── SECURITY_CONTACTS
├── applyconfigurations
├── code-of-conduct.md
├── rest        提供RestClient客户端,对K8S API Server执行Restful操作
├── discovery      提供DiscoveryClient 发现客户端,分装RestClient
├── kubernetes     提供ClientSet客户端,封装RestClient,对官方的资源对象进行操作
├── dynamic       提供DynamicClient 动态客户端,封装RestClient,对unstructed资源对象进行操作
├── tools        提供常用工具,例如Reflector、DealtFIFO、ShareInformer以及Indexers,提供Client查询和缓存机制
├── examples
├── go.mod
├── go.sum
├── informers      K8S资源的Informer实现,主要为官方的资源对象。操作CRD的Informer在dynamic目录下
├── listers       为K8S资源提供Lister实现,该实现对Get和List请求提供只读的缓存数据
├── kubernetes_test
├── metadata
├── pkg
├── plugin       提供OpenStack、GCP和Azure等云服务商授权插件
├── restmapper
├── scale        提供ScalClient客户端,用于扩容和所容Deployment、SatefulSet等资源对象
├── testing
├── third_party
├── transport      提供TCP连接,支持HTTP Stream(用于exec、attach操作)
└── util        提供常用的方法,包括WorkQueue和证书操作等



client-go客户端

client-go提供了数种客户端,他们用于完成和API Server的List & Watch的交互

RestClient

RestClient,最基础的客户端,对HTTP Request进行封装了,实现与Kubernetes REST apis交互。

  • rest/client.go
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}

type RESTClient struct {
...
   
// 本质上是 HTTP 客户端
Client *http.Client
}
  • rest/request.go
type Request struct {
c *RESTClient

...

// 叠加了K8S的一些概念
namespace    string
namespaceSet bool
resource     string
resourceName string
subresource  string

...
}
GET /api/v1/namespaces/test/pods
---
200 OK
Content-Type: application/json

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {"resourceVersion":"10245"},
  "items": [...]
}
  • 使用案例

package main

import (
 "fmt"

 corev1 "k8s.io/api/core/v1"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/kubernetes/scheme"
 "k8s.io/client-go/rest"
 "k8s.io/client-go/tools/clientcmd"
)

func main() {
 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
 if err != nil {
  panic(err.Error())
 }

 config.APIPath = "api"
 config.GroupVersion = &corev1.SchemeGroupVersion
 config.NegotiatedSerializer = scheme.Codecs

 restClient, err := rest.RESTClientFor(config)
 if err != nil {
  panic(err.Error())
 }

 result := &corev1.NodeList{}
 err = restClient.Get().Namespace("").Resource("nodes").VersionedParams(&metav1.ListOptions{Limit: 100}, scheme.ParameterCodec).Do().Into(result)
 if err != nil {
  panic(err)
 }

 for _, d := range result.Items {
  fmt.Printf("Node Name %v \n", d.Name)
 }
}

DiscoveryClient

Kubernetes有Group(/api,/apis等)、Version(v1,betav1等)和Resouce等信息,而DiscoveryClient正是用于获取组、版本、资源等信息。

  • discovery/discovery_client.go
// DiscoveryInterface holds the methods that discover server-supported API groups,
// versions and resources.
type DiscoveryInterface interface {
RESTClient() restclient.Interface
ServerGroupsInterface // go接口定义的方式之一
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
}

// ServerGroupsInterface has methods for obtaining supported groups on the API server
type ServerGroupsInterface interface {
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
ServerGroups() (*metav1.APIGroupList, error)
}
  • discovery/cached/disk/cached_discovery.go 和 discovery/cached/memory/memcache.go 是具体的实现
  • 使用案例
package main

import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)

func main()  {
config, err := clientcmd.BuildConfigFromFlags("","/root/.kube/config")
if err != nil {
panic(err.Error())
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err.Error())
}

_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
for _, list := range APIResourceList {
gv, err := schema.ParseGroupVersion(list.GroupVersion)
if err != nil {
panic(err.Error())
}
for _, resource := range list.APIResources {
fmt.Printf("name: %v, group: %v, version %v\n", resource.Name, gv.Group, gv.Version)
}
}
}

 

预期结果

go run main.go
name: bindings, group: , version v1
name: componentstatuses, group: , version v1
name: configmaps, group: , version v1
name: endpoints, group: , version v1
...



ClientSet

ClientSet,用于实例化官方资源的客户端(集),他对K8S的资源对象进行了Group Version维度的分组式封装。该实现的缺点时不能操作CRD。

  • kubernetes/clientset.go
type Interface interface {
Discovery() discovery.DiscoveryInterface
...
AppsV1() appsv1.AppsV1Interface
AppsV1beta1() appsv1beta1.AppsV1beta1Interface
AppsV1beta2() appsv1beta2.AppsV1beta2Interface
...
CoreV1() corev1.CoreV1Interface
...
}

type Clientset struct {
*discovery.DiscoveryClient
...
appsV1                       *appsv1.AppsV1Client
appsV1beta1                  *appsv1beta1.AppsV1beta1Client
appsV1beta2                  *appsv1beta2.AppsV1beta2Client
...
coreV1                       *corev1.CoreV1Client
...
}

 

  • 具体实现的目录结构

mac:kubernetes caiyueyang$ tree . -L 4

.
├── clientset.go
├── ...
└── typed
    ├── apps
    │   ├── v1
    │   │   └── ...
    │   ├── v1beta1
    │   │   └── ...
    │   └── v1beta2
    │       └── ...
    ├── core
    │   └── v1
    │       ├── core_client.go
    │       ├── pod.go
    │       └── ...
    └── ...

 

  • 以kubernetes/typed/core/v1/core_client.go为例
type CoreV1Interface interface {
RESTClient() rest.Interface
PodsGetter
...
}

// CoreV1Client is used to interact with features provided by the  group.
type CoreV1Client struct {
restClient rest.Interface
}

// NewForConfig creates a new CoreV1Client for the given config.
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &CoreV1Client{client}, nil
}

 

  • 以PodsGetter为例
// PodsGetter has a method to return a PodInterface.
// A group's client should implement this interface.
type PodsGetter interface {
Pods(namespace string) PodInterface
}

// PodInterface has methods to work with Pod resources.
type PodInterface interface {
Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)

PodExpansion
}

// 使用RESTClient进行操作
// Get takes name of the pod, and returns the corresponding pod object, and an error if there is any.
func (c *pods) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.Pod, err error) {
result = &v1.Pod{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do(ctx).
Into(result)
return
}
  • 使用案例
package main

import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}

podClient := clientset.CoreV1().Pods(apiv1.NamespaceDefault)

list, err := podClient.List(metav1.ListOptions{Limit: 500})
if err != nil {
panic(err)
}
for _, d := range list.Items {
if d.Name == "" {
}
// fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status)
}

//请求namespace为default下的deploy
deploymentClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
deployList, err2 := deploymentClient.List(metav1.ListOptions{Limit: 500})
if err2 != nil {
panic(err2)
}
for _, d := range deployList.Items {
if d.Name == "" {

}
// fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status)
}

// 请求ds资源 todo  有兴趣可以尝试下
// clientset.AppsV1().DaemonSets()

}



DynimicClient

DynimicClient是一个动态的客户端,可以对K8S的任意资源进行操作,包括CRD。可以理解这是一组操作非结构化资源的API,这是与ClientSet的最大不同。

  • dynamic/interface.go

type Interface interface {
Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
}

type ResourceInterface interface {
Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
}

type NamespaceableResourceInterface interface {
Namespace(string) ResourceInterface
ResourceInterface
}

type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}



  • dynamic/simple.go
type dynamicClient struct {
client *rest.RESTClient
}

type dynamicResourceClient struct {
client    *dynamicClient
namespace string
resource  schema.GroupVersionResource
}

// 创建一个dynamicResourceClient
func (c *dynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}

// 创建CR,底层也是使用RESTClient
func (c *dynamicResourceClient) Create(ctx context.Context, obj *unstructured.Unstructured, opts metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) {
...
result := c.client.client.
Post().
AbsPath(append(c.makeURLSegments(name), subresources...)...).
Body(outBytes).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
Do(ctx)
...
return uncastObj.(*unstructured.Unstructured), nil
}
// 使用 Group Version Resource组装URL
func (c *dynamicResourceClient) makeURLSegments(name string) []string {
url := []string{}
if len(c.resource.Group) == 0 {
url = append(url, "api")
} else {
url = append(url, "apis", c.resource.Group)
}
url = append(url, c.resource.Version)

if len(c.namespace) > 0 {
url = append(url, "namespaces", c.namespace)
}
url = append(url, c.resource.Resource)

if len(name) > 0 {
url = append(url, name)
}

return url
}



  • 使用案例
package main

import (
"fmt"

apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}

dymaicClient, err := dynamic.NewForConfig(config)
checkErr(err)
//map[string]interface{}

    //TODO 获取CRD资源 这里是获取了TIDB的CRD资源
// gvr := schema.GroupVersionResource{Version: "v1alpha1", Resource: "tidbclusters", Group: "pingcap.com"}
// unstructObj, err := dymaicClient.Resource(gvr).Namespace("tidb-cluster").List(metav1.ListOptions{Limit: 500})
// checkErr(err)
// fmt.Println(unstructObj)

gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
unstructObj, err := dymaicClient.Resource(gvr).Namespace(apiv1.NamespaceDefault).List(metav1.ListOptions{Limit: 500})
checkErr(err)
// fmt.Println(unstructObj)
podList := &corev1.PodList{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
checkErr(err)
for _, d := range podList.Items {
fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status)
}

}

func checkErr(err error) {
if err != nil {
panic(err)
}
}



Reflector

Reflector,反射器,做两件事

1、启动时从K8S API Server List资源,后继Watch资源的变化(增删改),完成1)List & Watch

2、将对象压入Delta FIFO Queue中,完成2)Add Object。

Reflector为Controller的属性,在Controoler.Run中启动Run

  • tools/cache/reflector.go
type Reflector struct {
    ...
    // 一个Reflector一个类型
expectedType reflect.Type
   
    // 底层就通过客户端实现,例如informers/core/v1/pod.go NewFilteredPodInformer
    // &cache.ListWatch{
//  ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
//   ...
//   return client.CoreV1().Pods(namespace).List(context.TODO(), options)
//  },
//  WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
//   ...
//   return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
//  },
// },
listerWatcher ListerWatcher
   
    // Deleta FIFO Queue,下面会讲
store Store
...
}

// 启动的入口,触发执行ListAndWatch,从API Server拉取资源对象并监听资源对象的变化
// Controller.Run中调用
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

// 完成1)List & Watch
// 完成2)Add Object
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...

    // List资源对象
if err := func() error {
...
        // 初始化一个处理List的管道,启动一个协程,并等待完成
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
...
// list资源对象
list, paginatedResult, err = pager.List(context.Background(), options)
...
            // 退出
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}

...
        // 提取资源,调用DeltaFIFO的Replace方法将资源加入Delta FIFO Queue
        // 完成2)Add Object
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}

    ...

    // Watch资源变化
for {
...
w, err := r.listerWatcher.Watch(options)
...
// 监听资源,将资源加入Delta FIFO Queue
        // 完成2)Add Object
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
...
}
}
}



Delta FIFO Queue

上节Reflector提到的store,具体实现就为Delta FIFO Queue,它干一件事情:

1、接收Reflector压入的资源对象,完事后广播,触发完成3)Pop Object。

Delta FIFO Queue为Controller.config的属性,Reflector压入资源对象之后广报,Delta FIFO Queue收到广报解除阻塞,并Pop资源对象给PopProcessFunc(实际为ShareInformer的HandleDeltas)。

  • tools/cache/delta_fifo.go
type DeltaFIFO struct {
...
// 存储的是以资源对象ID为key的事件列表
items map[string]Deltas

// 存储了资源对象ID
queue []string
...
}

// Reflector调用该方法
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
...
    // 同步或替换item
// Add Sync/Replaced action for each new item.
for _, item := range list {
...
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
...
}
// 压入资源对象并触发广报
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
...
    //去重
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) > 0 {
...
        // 广播,完成3)Pop Object
f.cond.Broadcast()
    }
    ...
}

// Delta FIFO Queue收到广报解除阻塞,并Pop资源对象给开发人员定义的Handler函数进行处理
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
        // 阻塞,直到调用了f.cond.Broadcast()
for len(f.queue) == 0 {
...
f.cond.Wait()
}
        // 取出头一个元素
id := f.queue[0]
f.queue = f.queue[1:]
...
item, ok := f.items[id]
...
delete(f.items, id)
        // 执行由Controller提供的PopProcessFunc
        // (实际为ShareInformer的HandleDeltas)
        // 入口controller.go中
err := process(item)
...
}
}



Informer

SharedInformer

Informer主要用来处理Pop的Object,然后干两件事情:

1、完成4)Add Object 到Indexer,5)Store Object & Key &Indexer 到Thread Safe Map

2、完成6)Dispatch Event Handler functions(Send Object to Custom Controller),将资源对象发送给自定义Controller。

client-go有ClientSet和DynimicClient两种客户端,对应的InformerFactory也有两种。

  • tools/cache/shared_informer.go
type sharedIndexInformer struct {
indexer    Indexer
controller Controller

processor             *sharedProcessor
cacheMutationDetector MutationDetector

listerWatcher ListerWatcher

// 一个资源对象类型一个Informer
objectType runtime.Object

...
}

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Delta FIFO Queue 通过 controller.Config 与Informer关联
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects:          s.indexer,
EmitDeltaTypeReplaced: true,
})

    // 在controller的processLoop中会用到Queue,执行Pop Object
    // Process(HandleDeltas)执行同步资源对象到Indexer和分发给listener,listener再调用用户定义的Handler
cfg := &Config{
Queue:            fifo,
ListerWatcher:    s.listerWatcher,
ObjectType:       s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError:     false,
ShouldResync:     s.processor.shouldResync,
Process:           s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
...
    // controller 入口,Run的时候触发Reflector.Run
s.controller.Run(stopCh)
}

// 处理资源对象事件
// 与Indexer交互,完成4)Add Object
// 并分发给listener(processor.distribute -> add -> pop -> run -> 业务的eventHandler),
// 完成6)Dispatch Event Handler functions(Send Object to Custom Controller)
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
return err
}
...
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
                // 完成4)Add Object
if err := s.indexer.Add(d.Object); err != nil {
return err
}
                // 完成6)Dispatch Event Handler functions(Send Object to Custom Controller)
// 本质将资源对象分发给Listener
                s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

// 增加事件处理器,这里就是自定义逻辑主要的切入点
// 注册事件处理程序 处理事件数据
// informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
//  AddFunc:    onAdd,
//  UpdateFunc: onUpdate,
//  DeleteFunc: onDelete,
// })
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()

...
   
//生产并增加监听器
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

if !s.started {
s.processor.addListener(listener)
return
}
...
}



NewSharedInformerFactory

NewSharedInformerFactory是SharedInformer的工厂,可以与ClientSet配合使用,这套组合可以看成是官方资源的特有组合,有“结构化”的方法使用,逻辑主要在informers目录中。

mac:kubernetes caiyueyang$ tree . -L 4

.
├── clientset.go
├── ...
└── typed
    ├── apps
    │   ├── v1
    │   │   └── ...
    │   ├── v1beta1
    │   │   └── ...
    │   └── v1beta2
    │       └── ...
    ├── core
    │   └── v1
    │       ├── core_client.go
    │       ├── pod.go
    │       └── ...
    └── ...


mac:informers caiyueyang$  tree . -L 4

.
├── apps
│   ├── interface.go
│   ├── v1
│   │   └── ...
│   ├── v1beta1
│   │   └── ...
│   └── v1beta2
│       └── ...
├── core
│   ├── interface.go
│   └── v1
│       ├── interface.go
│       ├── pod.go
│       └── ...
├── factory.go
└── ...



  • informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

// ClientSet的Interface 是CoreV1,这里Core,Version接口在core子目录中
Core() core.Interface
...
}

// 创建 Informer,被资源Informer调用,并将资源的Informmer注册回工厂
// 以PodInformer为例:
// func (f *podInformer) Informer() cache.SharedIndexInformer {
// return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
// }
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}

informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer

return informer
}

// 1、调用各个资源的sharedIndexInformer.Run,
// 2、Run中创建Config,Config包含Delta FIFO Queue
// 3、使用Config 创建Controller,调用Controller.Run
// 4、Controller.Run中创建Reflector,使用Delta FIFO Queue作为store
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

// 等待所有Informer同步资源对象
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()

res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
  • informers/core/interface.go
// Interface provides access to each of this group's versions.
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}



  • informers/core/v1/interface.go
// Interface provides access to all the informers in this group version.
type Interface interface {
...
// Pods returns a PodInformer.
Pods() PodInformer
}



  • informers/core/v1/pod.go

 

type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}

type podInformer struct {
factory          internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace        string
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}



Controller

Controller是连接处理逻辑的关键角色

func (c *controller) Run(stopCh <-chan struct{}) {
...
    // 创建Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
    // 执行Reflector.Run
wg.StartWithChannel(stopCh, r.Run)
   
    wait.Until(c.proccessLoop, time.Second, stopCh)
    ...
}

func (c *controller) processLoop() {
for {
        // 关键的一个连接点
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

示例一:官方资源对象Controller

package main

import (
"flag"
"fmt"
"path/filepath"
"time"

v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)

func main() {
var err error
var config *rest.Config

var kubeconfig *string

if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径")
} else {
kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig 绝对路径")
}
// 初始化 rest.Config 对象
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
panic(err.Error())
}
}
// 创建 Clientset 对象
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 初始化一个 SharedInformerFactory 设置resync为60秒一次,会触发UpdateFunc
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*60)
// 对 Deployment 监听
//这里如果获取v1betav1的deployment的资源
// informerFactory.Apps().V1beta1().Deployments()
deployInformer := informerFactory.Apps().V1().Deployments()
// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
informer := deployInformer.Informer()
// 创建 deployment的 Lister
deployLister := deployInformer.Lister()
// 注册事件处理程序 处理事件数据
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})

stopper := make(chan struct{})
defer close(stopper)

informerFactory.Start(stopper)
informerFactory.WaitForCacheSync(stopper)

// 从本地缓存中获取 default 命名空间中的所有 deployment 列表
deployments, err := deployLister.Deployments("default").List(labels.Everything())
if err != nil {
panic(err)
}
for idx, deploy := range deployments {
fmt.Printf("%d -> %s\n", idx+1, deploy.Name)
}

<-stopper
}

func onAdd(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("add a deployment:", deploy.Name)
}

func onUpdate(old, new interface{}) {
oldDeploy := old.(*v1.Deployment)
newDeploy := new.(*v1.Deployment)
fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)
}

func onDelete(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("delete a deployment:", deploy.Name)
}



示例二:CRD Controller

 

...
func main() {
var err error
var config *rest.Config

var kubeconfig *string

if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径")
} else {
kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig 绝对路径")
}
// 初始化 rest.Config 对象
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
panic(err.Error())
}
}
// 创建 dymaicClient 对象
dymaicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
    gvr := schema.GroupVersionResource{Version: "v1beta1", Resource: "deployments"}
   
// 初始化一个 DynamicSharedInformerFactory 设置resync为60秒一次,会触发UpdateFunc
informerFactory := informers.NewDynamicSharedInformerFactory(dymaicClient, time.Second*60)
// 对 Deployment 监听
    informerListerForGvr := informerFactory.ForResource(gvr)
// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
informer := informerListerForGvr.Informer()
// 创建 deployment的 Lister
deployLister := deployInformer.Lister(gvr)
// 注册事件处理程序 处理事件数据
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
...
}
...
0条评论
作者已关闭评论
Darren
11文章数
0粉丝数
Darren
11 文章 | 0 粉丝
Darren
11文章数
0粉丝数
Darren
11 文章 | 0 粉丝
原创

client-go源码分析

2023-04-24 13:05:32
29
0

运行图

下图展示client-go中各组件如何运行,以及它与Custom Controller的交互点。

源码结构与简要剖析

基于release-1.22。

mac:client-go caiyueyang$ tree . -L 1

.
├── CHANGELOG.md
├── CONTRIBUTING.md
├── INSTALL.md
├── LICENSE
├── OWNERS
├── README.md
├── SECURITY_CONTACTS
├── applyconfigurations
├── code-of-conduct.md
├── rest        提供RestClient客户端,对K8S API Server执行Restful操作
├── discovery      提供DiscoveryClient 发现客户端,分装RestClient
├── kubernetes     提供ClientSet客户端,封装RestClient,对官方的资源对象进行操作
├── dynamic       提供DynamicClient 动态客户端,封装RestClient,对unstructed资源对象进行操作
├── tools        提供常用工具,例如Reflector、DealtFIFO、ShareInformer以及Indexers,提供Client查询和缓存机制
├── examples
├── go.mod
├── go.sum
├── informers      K8S资源的Informer实现,主要为官方的资源对象。操作CRD的Informer在dynamic目录下
├── listers       为K8S资源提供Lister实现,该实现对Get和List请求提供只读的缓存数据
├── kubernetes_test
├── metadata
├── pkg
├── plugin       提供OpenStack、GCP和Azure等云服务商授权插件
├── restmapper
├── scale        提供ScalClient客户端,用于扩容和所容Deployment、SatefulSet等资源对象
├── testing
├── third_party
├── transport      提供TCP连接,支持HTTP Stream(用于exec、attach操作)
└── util        提供常用的方法,包括WorkQueue和证书操作等



client-go客户端

client-go提供了数种客户端,他们用于完成和API Server的List & Watch的交互

RestClient

RestClient,最基础的客户端,对HTTP Request进行封装了,实现与Kubernetes REST apis交互。

  • rest/client.go
type Interface interface {
GetRateLimiter() flowcontrol.RateLimiter
Verb(verb string) *Request
Post() *Request
Put() *Request
Patch(pt types.PatchType) *Request
Get() *Request
Delete() *Request
APIVersion() schema.GroupVersion
}

type RESTClient struct {
...
   
// 本质上是 HTTP 客户端
Client *http.Client
}
  • rest/request.go
type Request struct {
c *RESTClient

...

// 叠加了K8S的一些概念
namespace    string
namespaceSet bool
resource     string
resourceName string
subresource  string

...
}
GET /api/v1/namespaces/test/pods
---
200 OK
Content-Type: application/json

{
  "kind": "PodList",
  "apiVersion": "v1",
  "metadata": {"resourceVersion":"10245"},
  "items": [...]
}
  • 使用案例

package main

import (
 "fmt"

 corev1 "k8s.io/api/core/v1"
 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/kubernetes/scheme"
 "k8s.io/client-go/rest"
 "k8s.io/client-go/tools/clientcmd"
)

func main() {
 config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
 if err != nil {
  panic(err.Error())
 }

 config.APIPath = "api"
 config.GroupVersion = &corev1.SchemeGroupVersion
 config.NegotiatedSerializer = scheme.Codecs

 restClient, err := rest.RESTClientFor(config)
 if err != nil {
  panic(err.Error())
 }

 result := &corev1.NodeList{}
 err = restClient.Get().Namespace("").Resource("nodes").VersionedParams(&metav1.ListOptions{Limit: 100}, scheme.ParameterCodec).Do().Into(result)
 if err != nil {
  panic(err)
 }

 for _, d := range result.Items {
  fmt.Printf("Node Name %v \n", d.Name)
 }
}

DiscoveryClient

Kubernetes有Group(/api,/apis等)、Version(v1,betav1等)和Resouce等信息,而DiscoveryClient正是用于获取组、版本、资源等信息。

  • discovery/discovery_client.go
// DiscoveryInterface holds the methods that discover server-supported API groups,
// versions and resources.
type DiscoveryInterface interface {
RESTClient() restclient.Interface
ServerGroupsInterface // go接口定义的方式之一
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
}

// ServerGroupsInterface has methods for obtaining supported groups on the API server
type ServerGroupsInterface interface {
// ServerGroups returns the supported groups, with information like supported versions and the
// preferred version.
ServerGroups() (*metav1.APIGroupList, error)
}
  • discovery/cached/disk/cached_discovery.go 和 discovery/cached/memory/memcache.go 是具体的实现
  • 使用案例
package main

import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)

func main()  {
config, err := clientcmd.BuildConfigFromFlags("","/root/.kube/config")
if err != nil {
panic(err.Error())
}

discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err.Error())
}

_, APIResourceList, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
for _, list := range APIResourceList {
gv, err := schema.ParseGroupVersion(list.GroupVersion)
if err != nil {
panic(err.Error())
}
for _, resource := range list.APIResources {
fmt.Printf("name: %v, group: %v, version %v\n", resource.Name, gv.Group, gv.Version)
}
}
}

 

预期结果

go run main.go
name: bindings, group: , version v1
name: componentstatuses, group: , version v1
name: configmaps, group: , version v1
name: endpoints, group: , version v1
...



ClientSet

ClientSet,用于实例化官方资源的客户端(集),他对K8S的资源对象进行了Group Version维度的分组式封装。该实现的缺点时不能操作CRD。

  • kubernetes/clientset.go
type Interface interface {
Discovery() discovery.DiscoveryInterface
...
AppsV1() appsv1.AppsV1Interface
AppsV1beta1() appsv1beta1.AppsV1beta1Interface
AppsV1beta2() appsv1beta2.AppsV1beta2Interface
...
CoreV1() corev1.CoreV1Interface
...
}

type Clientset struct {
*discovery.DiscoveryClient
...
appsV1                       *appsv1.AppsV1Client
appsV1beta1                  *appsv1beta1.AppsV1beta1Client
appsV1beta2                  *appsv1beta2.AppsV1beta2Client
...
coreV1                       *corev1.CoreV1Client
...
}

 

  • 具体实现的目录结构

mac:kubernetes caiyueyang$ tree . -L 4

.
├── clientset.go
├── ...
└── typed
    ├── apps
    │   ├── v1
    │   │   └── ...
    │   ├── v1beta1
    │   │   └── ...
    │   └── v1beta2
    │       └── ...
    ├── core
    │   └── v1
    │       ├── core_client.go
    │       ├── pod.go
    │       └── ...
    └── ...

 

  • 以kubernetes/typed/core/v1/core_client.go为例
type CoreV1Interface interface {
RESTClient() rest.Interface
PodsGetter
...
}

// CoreV1Client is used to interact with features provided by the  group.
type CoreV1Client struct {
restClient rest.Interface
}

// NewForConfig creates a new CoreV1Client for the given config.
func NewForConfig(c *rest.Config) (*CoreV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientFor(&config)
if err != nil {
return nil, err
}
return &CoreV1Client{client}, nil
}

 

  • 以PodsGetter为例
// PodsGetter has a method to return a PodInterface.
// A group's client should implement this interface.
type PodsGetter interface {
Pods(namespace string) PodInterface
}

// PodInterface has methods to work with Pod resources.
type PodInterface interface {
Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)

PodExpansion
}

// 使用RESTClient进行操作
// Get takes name of the pod, and returns the corresponding pod object, and an error if there is any.
func (c *pods) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.Pod, err error) {
result = &v1.Pod{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do(ctx).
Into(result)
return
}
  • 使用案例
package main

import (
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}

podClient := clientset.CoreV1().Pods(apiv1.NamespaceDefault)

list, err := podClient.List(metav1.ListOptions{Limit: 500})
if err != nil {
panic(err)
}
for _, d := range list.Items {
if d.Name == "" {
}
// fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status)
}

//请求namespace为default下的deploy
deploymentClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
deployList, err2 := deploymentClient.List(metav1.ListOptions{Limit: 500})
if err2 != nil {
panic(err2)
}
for _, d := range deployList.Items {
if d.Name == "" {

}
// fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status)
}

// 请求ds资源 todo  有兴趣可以尝试下
// clientset.AppsV1().DaemonSets()

}



DynimicClient

DynimicClient是一个动态的客户端,可以对K8S的任意资源进行操作,包括CRD。可以理解这是一组操作非结构化资源的API,这是与ClientSet的最大不同。

  • dynamic/interface.go

type Interface interface {
Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
}

type ResourceInterface interface {
Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
}

type NamespaceableResourceInterface interface {
Namespace(string) ResourceInterface
ResourceInterface
}

type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}



  • dynamic/simple.go
type dynamicClient struct {
client *rest.RESTClient
}

type dynamicResourceClient struct {
client    *dynamicClient
namespace string
resource  schema.GroupVersionResource
}

// 创建一个dynamicResourceClient
func (c *dynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}

// 创建CR,底层也是使用RESTClient
func (c *dynamicResourceClient) Create(ctx context.Context, obj *unstructured.Unstructured, opts metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) {
...
result := c.client.client.
Post().
AbsPath(append(c.makeURLSegments(name), subresources...)...).
Body(outBytes).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
Do(ctx)
...
return uncastObj.(*unstructured.Unstructured), nil
}
// 使用 Group Version Resource组装URL
func (c *dynamicResourceClient) makeURLSegments(name string) []string {
url := []string{}
if len(c.resource.Group) == 0 {
url = append(url, "api")
} else {
url = append(url, "apis", c.resource.Group)
}
url = append(url, c.resource.Version)

if len(c.namespace) > 0 {
url = append(url, "namespaces", c.namespace)
}
url = append(url, c.resource.Resource)

if len(name) > 0 {
url = append(url, name)
}

return url
}



  • 使用案例
package main

import (
"fmt"

apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
)

func main() {
config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config")
if err != nil {
panic(err)
}

dymaicClient, err := dynamic.NewForConfig(config)
checkErr(err)
//map[string]interface{}

    //TODO 获取CRD资源 这里是获取了TIDB的CRD资源
// gvr := schema.GroupVersionResource{Version: "v1alpha1", Resource: "tidbclusters", Group: "pingcap.com"}
// unstructObj, err := dymaicClient.Resource(gvr).Namespace("tidb-cluster").List(metav1.ListOptions{Limit: 500})
// checkErr(err)
// fmt.Println(unstructObj)

gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"}
unstructObj, err := dymaicClient.Resource(gvr).Namespace(apiv1.NamespaceDefault).List(metav1.ListOptions{Limit: 500})
checkErr(err)
// fmt.Println(unstructObj)
podList := &corev1.PodList{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList)
checkErr(err)
for _, d := range podList.Items {
fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status)
}

}

func checkErr(err error) {
if err != nil {
panic(err)
}
}



Reflector

Reflector,反射器,做两件事

1、启动时从K8S API Server List资源,后继Watch资源的变化(增删改),完成1)List & Watch

2、将对象压入Delta FIFO Queue中,完成2)Add Object。

Reflector为Controller的属性,在Controoler.Run中启动Run

  • tools/cache/reflector.go
type Reflector struct {
    ...
    // 一个Reflector一个类型
expectedType reflect.Type
   
    // 底层就通过客户端实现,例如informers/core/v1/pod.go NewFilteredPodInformer
    // &cache.ListWatch{
//  ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
//   ...
//   return client.CoreV1().Pods(namespace).List(context.TODO(), options)
//  },
//  WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
//   ...
//   return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
//  },
// },
listerWatcher ListerWatcher
   
    // Deleta FIFO Queue,下面会讲
store Store
...
}

// 启动的入口,触发执行ListAndWatch,从API Server拉取资源对象并监听资源对象的变化
// Controller.Run中调用
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.BackoffUntil(func() {
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}

// 完成1)List & Watch
// 完成2)Add Object
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
...

    // List资源对象
if err := func() error {
...
        // 初始化一个处理List的管道,启动一个协程,并等待完成
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
...
// list资源对象
list, paginatedResult, err = pager.List(context.Background(), options)
...
            // 退出
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err)
}

...
        // 提取资源,调用DeltaFIFO的Replace方法将资源加入Delta FIFO Queue
        // 完成2)Add Object
initTrace.Step("Objects extracted")
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
initTrace.Step("SyncWith done")
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}

    ...

    // Watch资源变化
for {
...
w, err := r.listerWatcher.Watch(options)
...
// 监听资源,将资源加入Delta FIFO Queue
        // 完成2)Add Object
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
...
}
}
}



Delta FIFO Queue

上节Reflector提到的store,具体实现就为Delta FIFO Queue,它干一件事情:

1、接收Reflector压入的资源对象,完事后广播,触发完成3)Pop Object。

Delta FIFO Queue为Controller.config的属性,Reflector压入资源对象之后广报,Delta FIFO Queue收到广报解除阻塞,并Pop资源对象给PopProcessFunc(实际为ShareInformer的HandleDeltas)。

  • tools/cache/delta_fifo.go
type DeltaFIFO struct {
...
// 存储的是以资源对象ID为key的事件列表
items map[string]Deltas

// 存储了资源对象ID
queue []string
...
}

// Reflector调用该方法
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
...
    // 同步或替换item
// Add Sync/Replaced action for each new item.
for _, item := range list {
...
if err := f.queueActionLocked(action, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
...
}
// 压入资源对象并触发广报
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
...
    //去重
newDeltas = dedupDeltas(newDeltas)

if len(newDeltas) > 0 {
...
        // 广播,完成3)Pop Object
f.cond.Broadcast()
    }
    ...
}

// Delta FIFO Queue收到广报解除阻塞,并Pop资源对象给开发人员定义的Handler函数进行处理
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
        // 阻塞,直到调用了f.cond.Broadcast()
for len(f.queue) == 0 {
...
f.cond.Wait()
}
        // 取出头一个元素
id := f.queue[0]
f.queue = f.queue[1:]
...
item, ok := f.items[id]
...
delete(f.items, id)
        // 执行由Controller提供的PopProcessFunc
        // (实际为ShareInformer的HandleDeltas)
        // 入口controller.go中
err := process(item)
...
}
}



Informer

SharedInformer

Informer主要用来处理Pop的Object,然后干两件事情:

1、完成4)Add Object 到Indexer,5)Store Object & Key &Indexer 到Thread Safe Map

2、完成6)Dispatch Event Handler functions(Send Object to Custom Controller),将资源对象发送给自定义Controller。

client-go有ClientSet和DynimicClient两种客户端,对应的InformerFactory也有两种。

  • tools/cache/shared_informer.go
type sharedIndexInformer struct {
indexer    Indexer
controller Controller

processor             *sharedProcessor
cacheMutationDetector MutationDetector

listerWatcher ListerWatcher

// 一个资源对象类型一个Informer
objectType runtime.Object

...
}

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// Delta FIFO Queue 通过 controller.Config 与Informer关联
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects:          s.indexer,
EmitDeltaTypeReplaced: true,
})

    // 在controller的processLoop中会用到Queue,执行Pop Object
    // Process(HandleDeltas)执行同步资源对象到Indexer和分发给listener,listener再调用用户定义的Handler
cfg := &Config{
Queue:            fifo,
ListerWatcher:    s.listerWatcher,
ObjectType:       s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError:     false,
ShouldResync:     s.processor.shouldResync,
Process:           s.HandleDeltas,
WatchErrorHandler: s.watchErrorHandler,
}

func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()

s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
...
    // controller 入口,Run的时候触发Reflector.Run
s.controller.Run(stopCh)
}

// 处理资源对象事件
// 与Indexer交互,完成4)Add Object
// 并分发给listener(processor.distribute -> add -> pop -> run -> 业务的eventHandler),
// 完成6)Dispatch Event Handler functions(Send Object to Custom Controller)
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()

// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
return err
}
...
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
                // 完成4)Add Object
if err := s.indexer.Add(d.Object); err != nil {
return err
}
                // 完成6)Dispatch Event Handler functions(Send Object to Custom Controller)
// 本质将资源对象分发给Listener
                s.processor.distribute(addNotification{newObj: d.Object}, false)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}

// 增加事件处理器,这里就是自定义逻辑主要的切入点
// 注册事件处理程序 处理事件数据
// informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
//  AddFunc:    onAdd,
//  UpdateFunc: onUpdate,
//  DeleteFunc: onDelete,
// })
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}

func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()

...
   
//生产并增加监听器
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

if !s.started {
s.processor.addListener(listener)
return
}
...
}



NewSharedInformerFactory

NewSharedInformerFactory是SharedInformer的工厂,可以与ClientSet配合使用,这套组合可以看成是官方资源的特有组合,有“结构化”的方法使用,逻辑主要在informers目录中。

mac:kubernetes caiyueyang$ tree . -L 4

.
├── clientset.go
├── ...
└── typed
    ├── apps
    │   ├── v1
    │   │   └── ...
    │   ├── v1beta1
    │   │   └── ...
    │   └── v1beta2
    │       └── ...
    ├── core
    │   └── v1
    │       ├── core_client.go
    │       ├── pod.go
    │       └── ...
    └── ...


mac:informers caiyueyang$  tree . -L 4

.
├── apps
│   ├── interface.go
│   ├── v1
│   │   └── ...
│   ├── v1beta1
│   │   └── ...
│   └── v1beta2
│       └── ...
├── core
│   ├── interface.go
│   └── v1
│       ├── interface.go
│       ├── pod.go
│       └── ...
├── factory.go
└── ...



  • informers/factory.go
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

// ClientSet的Interface 是CoreV1,这里Core,Version接口在core子目录中
Core() core.Interface
...
}

// 创建 Informer,被资源Informer调用,并将资源的Informmer注册回工厂
// 以PodInformer为例:
// func (f *podInformer) Informer() cache.SharedIndexInformer {
// return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
// }
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}

resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}

informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer

return informer
}

// 1、调用各个资源的sharedIndexInformer.Run,
// 2、Run中创建Config,Config包含Delta FIFO Queue
// 3、使用Config 创建Controller,调用Controller.Run
// 4、Controller.Run中创建Reflector,使用Delta FIFO Queue作为store
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()

for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}

// 等待所有Informer同步资源对象
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()

informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()

res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
  • informers/core/interface.go
// Interface provides access to each of this group's versions.
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}



  • informers/core/v1/interface.go
// Interface provides access to all the informers in this group version.
type Interface interface {
...
// Pods returns a PodInformer.
Pods() PodInformer
}



  • informers/core/v1/pod.go

 

type PodInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.PodLister
}

type podInformer struct {
factory          internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace        string
}

func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
},
},
&corev1.Pod{},
resyncPeriod,
indexers,
)
}

func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}

func (f *podInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}



Controller

Controller是连接处理逻辑的关键角色

func (c *controller) Run(stopCh <-chan struct{}) {
...
    // 创建Reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
...
    // 执行Reflector.Run
wg.StartWithChannel(stopCh, r.Run)
   
    wait.Until(c.proccessLoop, time.Second, stopCh)
    ...
}

func (c *controller) processLoop() {
for {
        // 关键的一个连接点
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}

示例一:官方资源对象Controller

package main

import (
"flag"
"fmt"
"path/filepath"
"time"

v1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)

func main() {
var err error
var config *rest.Config

var kubeconfig *string

if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径")
} else {
kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig 绝对路径")
}
// 初始化 rest.Config 对象
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
panic(err.Error())
}
}
// 创建 Clientset 对象
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 初始化一个 SharedInformerFactory 设置resync为60秒一次,会触发UpdateFunc
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*60)
// 对 Deployment 监听
//这里如果获取v1betav1的deployment的资源
// informerFactory.Apps().V1beta1().Deployments()
deployInformer := informerFactory.Apps().V1().Deployments()
// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
informer := deployInformer.Informer()
// 创建 deployment的 Lister
deployLister := deployInformer.Lister()
// 注册事件处理程序 处理事件数据
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})

stopper := make(chan struct{})
defer close(stopper)

informerFactory.Start(stopper)
informerFactory.WaitForCacheSync(stopper)

// 从本地缓存中获取 default 命名空间中的所有 deployment 列表
deployments, err := deployLister.Deployments("default").List(labels.Everything())
if err != nil {
panic(err)
}
for idx, deploy := range deployments {
fmt.Printf("%d -> %s\n", idx+1, deploy.Name)
}

<-stopper
}

func onAdd(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("add a deployment:", deploy.Name)
}

func onUpdate(old, new interface{}) {
oldDeploy := old.(*v1.Deployment)
newDeploy := new.(*v1.Deployment)
fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name)
}

func onDelete(obj interface{}) {
deploy := obj.(*v1.Deployment)
fmt.Println("delete a deployment:", deploy.Name)
}



示例二:CRD Controller

 

...
func main() {
var err error
var config *rest.Config

var kubeconfig *string

if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径")
} else {
kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig 绝对路径")
}
// 初始化 rest.Config 对象
if config, err = rest.InClusterConfig(); err != nil {
if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
panic(err.Error())
}
}
// 创建 dymaicClient 对象
dymaicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
    gvr := schema.GroupVersionResource{Version: "v1beta1", Resource: "deployments"}
   
// 初始化一个 DynamicSharedInformerFactory 设置resync为60秒一次,会触发UpdateFunc
informerFactory := informers.NewDynamicSharedInformerFactory(dymaicClient, time.Second*60)
// 对 Deployment 监听
    informerListerForGvr := informerFactory.ForResource(gvr)
// 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源)
informer := informerListerForGvr.Informer()
// 创建 deployment的 Lister
deployLister := deployInformer.Lister(gvr)
// 注册事件处理程序 处理事件数据
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc:    onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
...
}
...
文章来自个人专栏
Darren的容器专栏
11 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0