下图展示client-go中各组件如何运行,以及它与Custom Controller的交互点。
mac:client-go caiyueyang$ tree . -L 1
. ├── CHANGELOG.md ├── CONTRIBUTING.md ├── INSTALL.md ├── LICENSE ├── OWNERS ├── README.md ├── SECURITY_CONTACTS ├── applyconfigurations ├── code-of-conduct.md ├── rest 提供RestClient客户端,对K8S API Server执行Restful操作 ├── discovery 提供DiscoveryClient 发现客户端,分装RestClient ├── kubernetes 提供ClientSet客户端,封装RestClient,对官方的资源对象进行操作 ├── dynamic 提供DynamicClient 动态客户端,封装RestClient,对unstructed资源对象进行操作 ├── tools 提供常用工具,例如Reflector、DealtFIFO、ShareInformer以及Indexers,提供Client查询和缓存机制 ├── examples ├── go.mod ├── go.sum ├── informers K8S资源的Informer实现,主要为官方的资源对象。操作CRD的Informer在dynamic目录下 ├── listers 为K8S资源提供Lister实现,该实现对Get和List请求提供只读的缓存数据 ├── kubernetes_test ├── metadata ├── pkg ├── plugin 提供OpenStack、GCP和Azure等云服务商授权插件 ├── restmapper ├── scale 提供ScalClient客户端,用于扩容和所容Deployment、SatefulSet等资源对象 ├── testing ├── third_party ├── transport 提供TCP连接,支持HTTP Stream(用于exec、attach操作) └── util 提供常用的方法,包括WorkQueue和证书操作等 |
client-go提供了数种客户端,他们用于完成和API Server的List & Watch的交互
RestClient,最基础的客户端,对HTTP Request进行封装了,实现与Kubernetes REST apis交互。
- rest/client.go
type Interface interface { GetRateLimiter() flowcontrol.RateLimiter Verb(verb string) *Request Post() *Request Put() *Request Patch(pt types.PatchType) *Request Get() *Request Delete() *Request APIVersion() schema.GroupVersion } type RESTClient struct { ... // 本质上是 HTTP 客户端 Client *http.Client } |
- rest/request.go
type Request struct { c *RESTClient ... // 叠加了K8S的一些概念 namespace string namespaceSet bool resource string resourceName string subresource string ... } |
- 扩展
GET /api/v1/namespaces/test/pods --- 200 OK Content-Type: application/json { "kind": "PodList", "apiVersion": "v1", "metadata": {"resourceVersion":"10245"}, "items": [...] } |
- 使用案例
package main import ( corev1 "k8s.io/api/core/v1" func main() { config.APIPath = "api" restClient, err := rest.RESTClientFor(config) result := &corev1.NodeList{} for _, d := range result.Items { |
- discovery/discovery_client.go
// DiscoveryInterface holds the methods that discover server-supported API groups, // versions and resources. type DiscoveryInterface interface { RESTClient() restclient.Interface ServerGroupsInterface // go接口定义的方式之一 ServerResourcesInterface ServerVersionInterface OpenAPISchemaInterface } // ServerGroupsInterface has methods for obtaining supported groups on the API server type ServerGroupsInterface interface { // ServerGroups returns the supported groups, with information like supported versions and the // preferred version. ServerGroups() (*metav1.APIGroupList, error) } |
- discovery/cached/disk/cached_discovery.go 和 discovery/cached/memory/memcache.go 是具体的实现
- 使用案例
package main import ( "fmt" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/client-go/tools/clientcmd" ) func main() { config, err := clientcmd.BuildConfigFromFlags("","/root/.kube/config") if err != nil { panic(err.Error()) } discoveryClient, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { panic(err.Error()) } _, APIResourceList, err := discoveryClient.ServerGroupsAndResources() if err != nil { panic(err.Error()) } for _, list := range APIResourceList { gv, err := schema.ParseGroupVersion(list.GroupVersion) if err != nil { panic(err.Error()) } for _, resource := range list.APIResources { fmt.Printf("name: %v, group: %v, version %v\n", resource.Name, gv.Group, gv.Version) } } } |
go run main.go name: bindings, group: , version v1 name: componentstatuses, group: , version v1 name: configmaps, group: , version v1 name: endpoints, group: , version v1 ... |
ClientSet,用于实例化官方资源的客户端(集),他对K8S的资源对象进行了Group Version维度的分组式封装。该实现的缺点时不能操作CRD。
- kubernetes/clientset.go
type Interface interface { Discovery() discovery.DiscoveryInterface ... AppsV1() appsv1.AppsV1Interface AppsV1beta1() appsv1beta1.AppsV1beta1Interface AppsV1beta2() appsv1beta2.AppsV1beta2Interface ... CoreV1() corev1.CoreV1Interface ... } type Clientset struct { *discovery.DiscoveryClient ... appsV1 *appsv1.AppsV1Client appsV1beta1 *appsv1beta1.AppsV1beta1Client appsV1beta2 *appsv1beta2.AppsV1beta2Client ... coreV1 *corev1.CoreV1Client ... } |
- 具体实现的目录结构
mac:kubernetes caiyueyang$ tree . -L 4
. ├── clientset.go ├── ... └── typed ├── apps │ ├── v1 │ │ └── ... │ ├── v1beta1 │ │ └── ... │ └── v1beta2 │ └── ... ├── core │ └── v1 │ ├── core_client.go │ ├── pod.go │ └── ... └── ... |
- 以kubernetes/typed/core/v1/core_client.go为例
type CoreV1Interface interface { RESTClient() rest.Interface PodsGetter ... } // CoreV1Client is used to interact with features provided by the group. type CoreV1Client struct { restClient rest.Interface } // NewForConfig creates a new CoreV1Client for the given config. func NewForConfig(c *rest.Config) (*CoreV1Client, error) { config := *c if err := setConfigDefaults(&config); err != nil { return nil, err } client, err := rest.RESTClientFor(&config) if err != nil { return nil, err } return &CoreV1Client{client}, nil } |
- 以PodsGetter为例
// PodsGetter has a method to return a PodInterface. // A group's client should implement this interface. type PodsGetter interface { Pods(namespace string) PodInterface } // PodInterface has methods to work with Pod resources. type PodInterface interface { Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error) Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error) UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error) List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error) Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error) ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error) UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error) PodExpansion } // 使用RESTClient进行操作 // Get takes name of the pod, and returns the corresponding pod object, and an error if there is any. func (c *pods) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.Pod, err error) { result = &v1.Pod{} err = c.client.Get(). Namespace(c.ns). Resource("pods"). Name(name). VersionedParams(&options, scheme.ParameterCodec). Do(ctx). Into(result) return } |
- 使用案例
package main import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) func main() { config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { panic(err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err) } podClient := clientset.CoreV1().Pods(apiv1.NamespaceDefault) list, err := podClient.List(metav1.ListOptions{Limit: 500}) if err != nil { panic(err) } for _, d := range list.Items { if d.Name == "" { } // fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status) } //请求namespace为default下的deploy deploymentClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault) deployList, err2 := deploymentClient.List(metav1.ListOptions{Limit: 500}) if err2 != nil { panic(err2) } for _, d := range deployList.Items { if d.Name == "" { } // fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status) } // 请求ds资源 todo 有兴趣可以尝试下 // clientset.AppsV1().DaemonSets() } |
- dynamic/interface.go
type Interface interface { type Unstructured struct { |
- dynamic/simple.go
type dynamicClient struct { client *rest.RESTClient } type dynamicResourceClient struct { client *dynamicClient namespace string resource schema.GroupVersionResource } // 创建一个dynamicResourceClient func (c *dynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface { return &dynamicResourceClient{client: c, resource: resource} } // 创建CR,底层也是使用RESTClient func (c *dynamicResourceClient) Create(ctx context.Context, obj *unstructured.Unstructured, opts metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) { ... result := c.client.client. Post(). AbsPath(append(c.makeURLSegments(name), subresources...)...). Body(outBytes). SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1). Do(ctx) ... return uncastObj.(*unstructured.Unstructured), nil } // 使用 Group Version Resource组装URL func (c *dynamicResourceClient) makeURLSegments(name string) []string { url := []string{} if len(c.resource.Group) == 0 { url = append(url, "api") } else { url = append(url, "apis", c.resource.Group) } url = append(url, c.resource.Version) if len(c.namespace) > 0 { url = append(url, "namespaces", c.namespace) } url = append(url, c.resource.Resource) if len(name) > 0 { url = append(url, name) } return url } |
- 使用案例
package main import ( "fmt" apiv1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/clientcmd" ) func main() { config, err := clientcmd.BuildConfigFromFlags("", "/root/.kube/config") if err != nil { panic(err) } dymaicClient, err := dynamic.NewForConfig(config) checkErr(err) //map[string]interface{} //TODO 获取CRD资源 这里是获取了TIDB的CRD资源 // gvr := schema.GroupVersionResource{Version: "v1alpha1", Resource: "tidbclusters", Group: "pingcap.com"} // unstructObj, err := dymaicClient.Resource(gvr).Namespace("tidb-cluster").List(metav1.ListOptions{Limit: 500}) // checkErr(err) // fmt.Println(unstructObj) gvr := schema.GroupVersionResource{Version: "v1", Resource: "pods"} unstructObj, err := dymaicClient.Resource(gvr).Namespace(apiv1.NamespaceDefault).List(metav1.ListOptions{Limit: 500}) checkErr(err) // fmt.Println(unstructObj) podList := &corev1.PodList{} err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), podList) checkErr(err) for _, d := range podList.Items { fmt.Printf("NAME:%v \t NAME:%v \t STATUS: %+v\n ", d.Namespace, d.Name, d.Status) } } func checkErr(err error) { if err != nil { panic(err) } } |
1、启动时从K8S API Server List资源,后继Watch资源的变化(增删改),完成1)List & Watch
2、将对象压入Delta FIFO Queue中,完成2)Add Object。
- tools/cache/reflector.go
type Reflector struct { ... // 一个Reflector一个类型 expectedType reflect.Type // 底层就通过客户端实现,例如informers/core/v1/pod.go NewFilteredPodInformer // &cache.ListWatch{ // ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { // ... // return client.CoreV1().Pods(namespace).List(context.TODO(), options) // }, // WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { // ... // return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) // }, // }, listerWatcher ListerWatcher // Deleta FIFO Queue,下面会讲 store Store ... } // 启动的入口,触发执行ListAndWatch,从API Server拉取资源对象并监听资源对象的变化 // Controller.Run中调用 func (r *Reflector) Run(stopCh <-chan struct{}) { klog.V(3).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) wait.BackoffUntil(func() { if err := r.ListAndWatch(stopCh); err != nil { r.watchErrorHandler(r, err) } }, r.backoffManager, true, stopCh) klog.V(3).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name) } // 完成1)List & Watch // 完成2)Add Object func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error { ... // List资源对象 if err := func() error { ... // 初始化一个处理List的管道,启动一个协程,并等待完成 listCh := make(chan struct{}, 1) panicCh := make(chan interface{}, 1) go func() { defer func() { if r := recover(); r != nil { panicCh <- r } }() // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first // list request will return the full response. pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) { return r.listerWatcher.List(opts) })) ... // list资源对象 list, paginatedResult, err = pager.List(context.Background(), options) ... // 退出 close(listCh) }() select { case <-stopCh: return nil case r := <-panicCh: panic(r) case <-listCh: } if err != nil { return fmt.Errorf("failed to list %v: %v", r.expectedTypeName, err) } ... // 提取资源,调用DeltaFIFO的Replace方法将资源加入Delta FIFO Queue // 完成2)Add Object initTrace.Step("Objects extracted") if err := r.syncWith(items, resourceVersion); err != nil { return fmt.Errorf("unable to sync list result: %v", err) } initTrace.Step("SyncWith done") r.setLastSyncResourceVersion(resourceVersion) initTrace.Step("Resource version updated") return nil }(); err != nil { return err } ... // Watch资源变化 for { ... w, err := r.listerWatcher.Watch(options) ... // 监听资源,将资源加入Delta FIFO Queue // 完成2)Add Object if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil { ... } } } |
Delta FIFO Queue
上节Reflector提到的store,具体实现就为Delta FIFO Queue,它干一件事情:
1、接收Reflector压入的资源对象,完事后广播,触发完成3)Pop Object。
Delta FIFO Queue为Controller.config的属性,Reflector压入资源对象之后广报,Delta FIFO Queue收到广报解除阻塞,并Pop资源对象给PopProcessFunc(实际为ShareInformer的HandleDeltas)。
- tools/cache/delta_fifo.go
type DeltaFIFO struct { ... // 存储的是以资源对象ID为key的事件列表 items map[string]Deltas // 存储了资源对象ID queue []string ... } // Reflector调用该方法 func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error { ... // 同步或替换item // Add Sync/Replaced action for each new item. for _, item := range list { ... if err := f.queueActionLocked(action, item); err != nil { return fmt.Errorf("couldn't enqueue object: %v", err) } } ... } // 压入资源对象并触发广报 func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error { ... //去重 newDeltas = dedupDeltas(newDeltas) if len(newDeltas) > 0 { ... // 广播,完成3)Pop Object f.cond.Broadcast() } ... } // Delta FIFO Queue收到广报解除阻塞,并Pop资源对象给开发人员定义的Handler函数进行处理 func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { // 阻塞,直到调用了f.cond.Broadcast() for len(f.queue) == 0 { ... f.cond.Wait() } // 取出头一个元素 id := f.queue[0] f.queue = f.queue[1:] ... item, ok := f.items[id] ... delete(f.items, id) // 执行由Controller提供的PopProcessFunc // (实际为ShareInformer的HandleDeltas) // 入口controller.go中 err := process(item) ... } } |
1、完成4)Add Object 到Indexer,5)Store Object & Key &Indexer 到Thread Safe Map
2、完成6)Dispatch Event Handler functions(Send Object to Custom Controller),将资源对象发送给自定义Controller。
- tools/cache/shared_informer.go
type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector MutationDetector listerWatcher ListerWatcher // 一个资源对象类型一个Informer objectType runtime.Object ... } func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() // Delta FIFO Queue 通过 controller.Config 与Informer关联 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{ KnownObjects: s.indexer, EmitDeltaTypeReplaced: true, }) // 在controller的processLoop中会用到Queue,执行Pop Object // Process(HandleDeltas)执行同步资源对象到Indexer和分发给listener,listener再调用用户定义的Handler cfg := &Config{ Queue: fifo, ListerWatcher: s.listerWatcher, ObjectType: s.objectType, FullResyncPeriod: s.resyncCheckPeriod, RetryOnError: false, ShouldResync: s.processor.shouldResync, Process: s.HandleDeltas, WatchErrorHandler: s.watchErrorHandler, } func() { s.startedLock.Lock() defer s.startedLock.Unlock() s.controller = New(cfg) s.controller.(*controller).clock = s.clock s.started = true }() ... // controller 入口,Run的时候触发Reflector.Run s.controller.Run(stopCh) } // 处理资源对象事件 // 与Indexer交互,完成4)Add Object // 并分发给listener(processor.distribute -> add -> pop -> run -> 业务的eventHandler), // 完成6)Dispatch Event Handler functions(Send Object to Custom Controller) func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } ... s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { // 完成4)Add Object if err := s.indexer.Add(d.Object); err != nil { return err } // 完成6)Dispatch Event Handler functions(Send Object to Custom Controller) // 本质将资源对象分发给Listener s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil } // 增加事件处理器,这里就是自定义逻辑主要的切入点 // 注册事件处理程序 处理事件数据 // informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ // AddFunc: onAdd, // UpdateFunc: onUpdate, // DeleteFunc: onDelete, // }) func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) { s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod) } func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) { s.startedLock.Lock() defer s.startedLock.Unlock() ... //生产并增加监听器 listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize) if !s.started { s.processor.addListener(listener) return } ... } |
mac:kubernetes caiyueyang$ tree . -L 4
. ├── clientset.go ├── ... └── typed ├── apps │ ├── v1 │ │ └── ... │ ├── v1beta1 │ │ └── ... │ └── v1beta2 │ └── ... ├── core │ └── v1 │ ├── core_client.go │ ├── pod.go │ └── ... └── ... |
mac:informers caiyueyang$ tree . -L 4
. ├── apps │ ├── interface.go │ ├── v1 │ │ └── ... │ ├── v1beta1 │ │ └── ... │ └── v1beta2 │ └── ... ├── core │ ├── interface.go │ └── v1 │ ├── interface.go │ ├── pod.go │ └── ... ├── factory.go └── ... |
- informers/factory.go
type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool // ClientSet的Interface 是CoreV1,这里Core,Version接口在core子目录中 Core() core.Interface ... } // 创建 Informer,被资源Informer调用,并将资源的Informmer注册回工厂 // 以PodInformer为例: // func (f *podInformer) Informer() cache.SharedIndexInformer { // return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) // } func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informerType := reflect.TypeOf(obj) informer, exists := f.informers[informerType] if exists { return informer } resyncPeriod, exists := f.customResync[informerType] if !exists { resyncPeriod = f.defaultResync } informer = newFunc(f.client, resyncPeriod) f.informers[informerType] = informer return informer } // 1、调用各个资源的sharedIndexInformer.Run, // 2、Run中创建Config,Config包含Delta FIFO Queue // 3、使用Config 创建Controller,调用Controller.Run // 4、Controller.Run中创建Reflector,使用Delta FIFO Queue作为store // Start initializes all requested informers. func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) { f.lock.Lock() defer f.lock.Unlock() for informerType, informer := range f.informers { if !f.startedInformers[informerType] { go informer.Run(stopCh) f.startedInformers[informerType] = true } } } // 等待所有Informer同步资源对象 // WaitForCacheSync waits for all started informers' cache were synced. func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { informers := func() map[reflect.Type]cache.SharedIndexInformer { f.lock.Lock() defer f.lock.Unlock() informers := map[reflect.Type]cache.SharedIndexInformer{} for informerType, informer := range f.informers { if f.startedInformers[informerType] { informers[informerType] = informer } } return informers }() res := map[reflect.Type]bool{} for informType, informer := range informers { res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced) } return res } |
- informers/core/interface.go
// Interface provides access to each of this group's versions. type Interface interface { // V1 provides access to shared informers for resources in V1. V1() v1.Interface } |
- informers/core/v1/interface.go
// Interface provides access to all the informers in this group version. type Interface interface { ... // Pods returns a PodInformer. Pods() PodInformer } |
- informers/core/v1/pod.go
type PodInformer interface { Informer() cache.SharedIndexInformer Lister() v1.PodLister } type podInformer struct { factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc namespace string } func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { return cache.NewSharedIndexInformer( &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).List(context.TODO(), options) }, WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } return client.CoreV1().Pods(namespace).Watch(context.TODO(), options) }, }, &corev1.Pod{}, resyncPeriod, indexers, ) } func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) } func (f *podInformer) Informer() cache.SharedIndexInformer { return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer) } |
func (c *controller) Run(stopCh <-chan struct{}) { ... // 创建Reflector r := NewReflector( c.config.ListerWatcher, c.config.ObjectType, c.config.Queue, c.config.FullResyncPeriod, ) ... // 执行Reflector.Run wg.StartWithChannel(stopCh, r.Run) wait.Until(c.proccessLoop, time.Second, stopCh) ... } func (c *controller) processLoop() { for { // 关键的一个连接点 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // This is the safe way to re-enqueue. c.config.Queue.AddIfNotPresent(obj) } } } } |
package main import ( "flag" "fmt" "path/filepath" "time" v1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" ) func main() { var err error var config *rest.Config var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径") } else { kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig 绝对路径") } // 初始化 rest.Config 对象 if config, err = rest.InClusterConfig(); err != nil { if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 创建 Clientset 对象 clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } // 初始化一个 SharedInformerFactory 设置resync为60秒一次,会触发UpdateFunc informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*60) // 对 Deployment 监听 //这里如果获取v1betav1的deployment的资源 // informerFactory.Apps().V1beta1().Deployments() deployInformer := informerFactory.Apps().V1().Deployments() // 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源) informer := deployInformer.Informer() // 创建 deployment的 Lister deployLister := deployInformer.Lister() // 注册事件处理程序 处理事件数据 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: onUpdate, DeleteFunc: onDelete, }) stopper := make(chan struct{}) defer close(stopper) informerFactory.Start(stopper) informerFactory.WaitForCacheSync(stopper) // 从本地缓存中获取 default 命名空间中的所有 deployment 列表 deployments, err := deployLister.Deployments("default").List(labels.Everything()) if err != nil { panic(err) } for idx, deploy := range deployments { fmt.Printf("%d -> %s\n", idx+1, deploy.Name) } <-stopper } func onAdd(obj interface{}) { deploy := obj.(*v1.Deployment) fmt.Println("add a deployment:", deploy.Name) } func onUpdate(old, new interface{}) { oldDeploy := old.(*v1.Deployment) newDeploy := new.(*v1.Deployment) fmt.Println("update deployment:", oldDeploy.Name, newDeploy.Name) } func onDelete(obj interface{}) { deploy := obj.(*v1.Deployment) fmt.Println("delete a deployment:", deploy.Name) } |
示例二:CRD Controller
... func main() { var err error var config *rest.Config var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "[可选] kubeconfig 绝对路径") } else { kubeconfig = flag.String("kubeconfig", filepath.Join("/tmp", "config"), "kubeconfig 绝对路径") } // 初始化 rest.Config 对象 if config, err = rest.InClusterConfig(); err != nil { if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil { panic(err.Error()) } } // 创建 dymaicClient 对象 dymaicClient, err := dynamic.NewForConfig(config) if err != nil { panic(err.Error()) } gvr := schema.GroupVersionResource{Version: "v1beta1", Resource: "deployments"} // 初始化一个 DynamicSharedInformerFactory 设置resync为60秒一次,会触发UpdateFunc informerFactory := informers.NewDynamicSharedInformerFactory(dymaicClient, time.Second*60) // 对 Deployment 监听 informerListerForGvr := informerFactory.ForResource(gvr) // 创建 Informer(相当于注册到工厂中去,这样下面启动的时候就会去 List & Watch 对应的资源) informer := informerListerForGvr.Informer() // 创建 deployment的 Lister deployLister := deployInformer.Lister(gvr) // 注册事件处理程序 处理事件数据 informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: onUpdate, DeleteFunc: onDelete, }) ... } ... |