searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Istio配置管理机制解析

2023-07-06 06:12:53
37
0

一、背景

在 Istio 中,ConfigStoreController(配置存储控制器)是控制平面组件之一,其作用是管理和同步 Istio 配置存储的状态和变化。

具体而言,ConfigStoreController 的作用包括以下几个方面:

  • 配置存储的初始化:ConfigStoreController 负责初始化 Istio 的配置存储,包括加载和解析配置文件、读取和验证配置信息,并将其转化为内部数据结构。这些配置信息可以包括路由规则、策略配置、监控设置等。
  • 配置存储的更新和同步:ConfigStoreController 监听配置存储的变化,并负责将变化的配置信息更新到内部数据结构中,以确保控制平面使用的配置始终与存储的配置保持同步。当配置存储发生变化时,ConfigStoreController 将触发相应的更新操作,例如重新加载路由规则、更新策略配置等。
  • 配置的持久化和缓存:ConfigStoreController 将配置信息持久化到适当的存储介质中,以确保配置的可靠存储和持久性。它还维护一个本地缓存,用于快速访问配置信息,减少对底层存储的频繁访问。
  • 配置的验证和合规性检查:ConfigStoreController 对从配置存储加载的配置进行验证和合规性检查,确保配置的正确性和符合规范。它可以检查配置中的错误、冲突或不一致之类的问题,并提供相应的日志和警告信息。

ConfigStoreController 在 Istio 控制平面中起着重要的作用,负责管理和同步 Istio 的配置存储。它确保配置存储的状态始终与控制平面保持同步,并提供配置的加载、更新、持久化和合规性检查等功能。这有助于确保 Istio 控制平面使用正确的配置,并支持灵活的配置管理和更新能力。

二、配置存储控制器结构

在软件开发中,一个东西的结构和运行原理通常是先有结构,然后才有运行原理。

首先,设计和定义一个东西的结构是为了明确其组成部分、关系和功能。结构定义了该东西的静态属性和组织方式,可以是一种数据结构、类的定义、配置文件格式等。它描述了东西的整体框架和组织方式,以及各个组件之间的关系和交互。

一旦有了结构的定义,接下来就是理解和定义该东西的运行原理。运行原理涉及该东西的行为、工作方式和执行过程。它描述了如何使用该结构,以及在特定条件下该东西是如何运行的。运行原理可以包括算法、逻辑流程、工作流程等,它解释了东西是如何根据输入执行操作并产生输出的。

因此,通常是先定义一个东西的结构,然后再定义它的运行原理。结构提供了框架和组织方式,而运行原理定义了具体的行为和工作方式。这种顺序有助于在开发过程中明确需求、设计架构,并将其转化为可运行的实现。

ConfigStoreController是配置存储的本地完全复制缓存,具有额外的处理程序。控制器主动地将其本地状态与远程存储同步,并提供接收更新事件的通知机制。因此,通知处理程序必须在调用_Run_之前注册,并且缓存需要在调用_Run _之后的初始同步宽限期。更新通知需要以下一致性保证:缓存中的视图必须至少与通知到达时一样实时,但可能更加实时(例如,如果_Delete_取消_Add_事件)。处理程序按附加顺序在单个工作队列上执行。处理程序接收通知事件和关联的对象。请注意,在启动缓存控制器之前,必须注册所有处理程序。

结构如下:

type ConfigStoreController interface {
	ConfigStore

	// RegisterEventHandler 添加一个处理程序以接收配置类型的配置更新事件
	RegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)

	// 运行直到收到信号
	Run(stop <-chan struct{})

	// 如果存储已启动,则应调用SetWatchErrorHandler
	SetWatchErrorHandler(func(r *cache.Reflector, err error)) error

	//store开始后返回true
	HasStarted() bool

	// 初始缓存同步完成后,HasSynced返回true
	HasSynced() bool
}

作为一个配置管理中心,首先是先定义好一个配置的存储结构和方式,ConfigStore就是这么一个结构,那配置定义有哪些,以及配置对应肯定会有增删改查的一些基本操作,因此ConfigStore的结构以接口的方式定义了一些公共的基础操作。

ConfigStore描述了一组与平台无关的API,底层平台必须支持这些API才能存储和检索Istio配置。配置键被定义为配置对象的类型、名称和命名空间的组合。配置密钥保证在存储中是唯一的。这里提供的存储接口假设底层存储层支持_Get_(list)、_Update_(Update)、_Create_(Create)和_Delete_语义,但不保证任何事务语义_Update_、_Create_和_Delete_是赋值操作。这些操作是异步的,可能不会立即看到效果(例如,_Get_可能不会在更改存储后立即按键返回对象。)即使操作成功,也可能会出现间歇性错误,因此即使更改操作返回错误,也应始终检查对象存储是否已被修改。应使用_Create_操作和_Update_创建对象。

资源版本记录每个对象上的最后一次变异操作。如果将突变应用于对象的不同修订,而不是底层存储所期望的(由纯相等定义),则该操作将被阻止。此接口的客户端不应对修订标识符的结构或顺序做出假设。从该接口提供和返回的对象引用应被视为只读。修改它们会违反线程安全性。

type ConfigStore interface {
	// Schemas 公开配置存储区已知的配置类型架构。类型模式定义了配置类型和protobuf编码模式之间的双向映射。
	Schemas() collection.Schemas

	// Get通过类型和键检索配置元素
	Get(typ config.GroupVersionKind, name, namespace string) *config.Config

	// 列表按类型和命名空间返回对象。对命名空间使用“”以跨命名空间列出。
	List(typ config.GroupVersionKind, namespace string) ([]config.Config, error)

	// Create将一个新的配置对象添加到存储中。如果该类型已存在具有相同名称和命名空间的对象,则操作将失败,不会产生任何副作用。
	Create(config config.Config) (revision string, err error)

	// 更新修改存储中的现有配置对象。更新要求已创建对象。资源版本防止覆盖在先前的_Get_和_Put_操作之间已更改的值以实现乐观并发。如果操作成功,此方法将返回新的修订。
	Update(config config.Config) (newRevision string, err error)
	UpdateStatus(config config.Config) (newRevision string, err error)

	// Patch仅应用在PatchFunc中所做的修改,而不是进行完全替换。当同一资源有多个并发写入程序时,有助于避免读-修改-写冲突。
	Patch(orig config.Config, patchFn config.PatchFunc) (string, error)

	// Delete通过键从存储中删除对象对于k8s,在执行删除之前必须满足resourceVersion。如果不可能,将返回409冲突状态。
	Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
}

配置管理的另一个重要特性跟注册中心一样是兼容不同的配置中心。因为Istio主要依赖于K8s容器,因此,k8s的配置必定会支持,由于Istio的配置下发采用的是XDS协议,因此,如果支持非k8s的配置管理,该配置中心需要实现XDS协议。k8s配置可以应用k8s的etcd来存储配置,如果是非k8s配置中心,则采用内存缓存的方式在内部缓存一份配置,持久化的存储则依赖外部配置中心。

2.1 内存存储

type Controller struct {
	monitor     Monitor
	configStore model.ConfigStore
	hasSynced   func() bool

	started atomic.Bool
	namespacesFilter func(obj interface{}) bool
}

2.2 K8s存储

Client是Istio CRD的客户端,实现配置存储缓存。它用于Istio配置上的CRUD操作员,以及处理配置更改时的事件。

type Client struct {
	//模式定义了此客户端使用的模式集。注意:这必须是代码生成中定义的模式的子集
	schemas collection.Schemas

	// domain配置元数据的后缀
	domainSuffix string

	// 此控制平面实例的修订。将只阅读与此修订版匹配的配置。
	revision string

	// kinds跟踪已知类型的所有缓存处理程序
	kinds   map[config.GroupVersionKind]*cacheHandler
	kindsMu sync.RWMutex
	queue   queue.Instance

	// 处理程序定义每种类型的事件处理程序列表
	handlers map[config.GroupVersionKind][]model.EventHandler

	// 我们将用于访问对象的istioclient-go客户端
	istioClient istioclient.Interface

	// 我们将用于访问对象的网关api客户端
	gatewayAPIClient gatewayapiclient.Interface

	// 调用SyncAll时,beginSync设置为true,表示控制器已开始同步资源。
	beginSync *atomic.Bool
	// initialSync在执行所有对象的初始处理之后被设置为true。
	initialSync         *atomic.Bool
	schemasByCRDName    map[string]collection.Schema
	client              kube.Client
	crdMetadataInformer cache.SharedIndexInformer
	logger              *log.Scope

	// namespaceFilter仅用于启动已筛选的informer。
	namespacesFilter func(obj interface{}) bool
}

配置模式(schemas)指定了配置文件的结构、字段类型、验证规则等信息。通过使用“NewForSchemas”函数,可以根据特定的配置模式创建一个解析器,然后使用该解析器来解析配置文件或数据,这对于在Istio中解析和处理与特定模式相对应的配置文件非常有用,以确保配置的正确性和合规性。

schemas定义了Istio的Pilot资源集合:

  1. IstioExtensionsV1Alpha1Wasmplugins
  2. IstioNetworkingV1Alpha3Destinationrules
  3. IstioNetworkingV1Alpha3Envoyfilters
  4. IstioNetworkingV1Alpha3Gateways
  5. IstioNetworkingV1Alpha3Serviceentries
  6. IstioNetworkingV1Alpha3Sidecars
  7. IstioNetworkingV1Alpha3Virtualservices
  8. IstioNetworkingV1Alpha3Workloadentries
  9. IstioNetworkingV1Alpha3Workloadgroups
  10. IstioNetworkingV1Beta1Proxyconfigs
  11. IstioSecurityV1Beta1Authorizationpolicies
  12. IstioSecurityV1Beta1Peerauthentications
  13. IstioSecurityV1Beta1Requestauthentications
  14. IstioTelemetryV1Alpha1Telemetries

如果Kubernetes gateway-api设置为enabled的话,还会包括PilotGatewayAPI的资源。

 

三、配置存储控制器运行原理

3.1 Istio配置

在2.2章节讲到基于k8s的配置存储,存储的是Istio的资源配置,那有了这些配置存储之后,必定就会考虑到怎么使用,因为是基于K8S的,所以很容易想到Informer机制去实现Istio的配置监听,并同步到Istio配置存储中。

在为Istio创建Infromer监听器之前,需要做一些过滤操作,先是查询集群已有的CRD资源,对比schemas集合中的资源,命中则创建,然后匹配Kubernetes控制的CRD资源,如果命中,也创建该监听器。

func handleCRDAdd(cl *Client, name string, stop <-chan struct{}) {
	cl.logger.Debugf("adding CRD %q", name)
	s, f := cl.schemasByCRDName[name]
	if !f {
		cl.logger.Debugf("added resource that we are not watching: %v", name)
		return
	}
	resourceGVK := s.Resource().GroupVersionKind()
	gvr := s.Resource().GroupVersionResource()

	cl.kindsMu.Lock()
	defer cl.kindsMu.Unlock()
	if _, f := cl.kinds[resourceGVK]; f {
		cl.logger.Debugf("added resource that already exists: %v", resourceGVK)
		return
	}
	var i informers.GenericInformer
	var ifactory starter
	var err error
	switch s.Resource().Group() {
	case gvk.KubernetesGateway.Group:
		ifactory = cl.client.GatewayAPIInformer()
		i, err = cl.client.GatewayAPIInformer().ForResource(gvr)
	case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:
		ifactory = cl.client.KubeInformer()
		i, err = cl.client.KubeInformer().ForResource(gvr)
	case gvk.CustomResourceDefinition.Group:
		ifactory = cl.client.ExtInformer()
		i, err = cl.client.ExtInformer().ForResource(gvr)
	default:
		ifactory = cl.client.IstioInformer()
		i, err = cl.client.IstioInformer().ForResource(gvr)
	}

	if err != nil {
		// Shouldn't happen
		cl.logger.Errorf("failed to create informer for %v: %v", resourceGVK, err)
		return
	}
	_ = i.Informer().SetTransform(kube.StripUnusedFields)

	cl.kinds[resourceGVK] = createCacheHandler(cl, s, i)
	if w, f := crdWatches[resourceGVK]; f {
		cl.logger.Infof("notifying watchers %v was created", resourceGVK)
		w.once.Do(func() {
			close(w.stop)
		})
	}
	if stop != nil {
		// 仅当定义了停止时,才启动informer工厂。在启动的情况下,我们不会从这里开始,因为一旦我们准备好初始化,我们就会启动所有工厂。对于动态添加的CRD,我们需要立即启动
		ifactory.Start(stop)
	}
}

handleCrdAdd函数是用于处理自定义资源定义(Custom Resource Definitions,CRD)的添加事件的函数之一。

CRD是Kubernetes的扩展机制,允许用户定义自己的资源类型。Istio使用CRD来定义其特定的配置资源,如VirtualService、DestinationRule等。当Kubernetes集群中创建了一个新的Istio CRD实例时,将会触发相应的添加事件。

handleCrdAdd函数的作用包括以下几个方面:

解析CRD实例:函数负责解析新添加的CRD实例,并将其转换为内部数据结构,以便后续处理。它可以从CRD实例中提取关键信息,如配置规则、路由规则等。

验证CRD实例:函数对新添加的CRD实例进行验证,确保其符合预期的格式和规范。这可能涉及验证字段的完整性、类型的正确性以及与其他资源的一致性等。

执行配置更新:根据CRD实例的内容,函数会触发相应的配置更新操作。这可能包括更新路由规则、策略配置、服务发现等,以确保控制平面使用最新的配置。

通知其他组件:函数可能与其他组件进行通信,以便通知它们有关新的CRD实例的添加。这样,其他组件可以相应地采取行动,例如更新缓存、重新加载配置等。

总的来说,handleCrdAdd函数是Istio控制平面中的一部分,负责处理新添加的CRD实例。它负责解析、验证和处理CRD实例,并相应地更新和通知其他组件,以确保Istio控制平面中的相关配置保持最新和一致。

重点关注IstioInformer,里面定义了Istio所有资源informer:

func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
	switch resource {
	// Group=extensions.istio.io, Version=v1alpha1
	case v1alpha1.SchemeGroupVersion.WithResource("wasmplugins"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1alpha1().WasmPlugins().Informer()}, nil

		// Group=networking.istio.io, Version=v1alpha3
	case v1alpha3.SchemeGroupVersion.WithResource("destinationrules"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().DestinationRules().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("envoyfilters"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().EnvoyFilters().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("gateways"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().Gateways().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("serviceentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().ServiceEntries().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("sidecars"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().Sidecars().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("virtualservices"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().VirtualServices().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("workloadentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().WorkloadEntries().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("workloadgroups"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().WorkloadGroups().Informer()}, nil

		// Group=networking.istio.io, Version=v1beta1
	case v1beta1.SchemeGroupVersion.WithResource("destinationrules"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().DestinationRules().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("gateways"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().Gateways().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("proxyconfigs"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().ProxyConfigs().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("serviceentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().ServiceEntries().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("sidecars"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().Sidecars().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("virtualservices"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().VirtualServices().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("workloadentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().WorkloadEntries().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("workloadgroups"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().WorkloadGroups().Informer()}, nil

		// Group=security.istio.io, Version=v1beta1
	case securityv1beta1.SchemeGroupVersion.WithResource("authorizationpolicies"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Security().V1beta1().AuthorizationPolicies().Informer()}, nil
	case securityv1beta1.SchemeGroupVersion.WithResource("peerauthentications"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Security().V1beta1().PeerAuthentications().Informer()}, nil
	case securityv1beta1.SchemeGroupVersion.WithResource("requestauthentications"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Security().V1beta1().RequestAuthentications().Informer()}, nil

		// Group=telemetry.istio.io, Version=v1alpha1
	case telemetryv1alpha1.SchemeGroupVersion.WithResource("telemetries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Telemetry().V1alpha1().Telemetries().Informer()}, nil

	}

	return nil, fmt.Errorf("no informer found for %v", resource)
}

3.2 MCP

如果网格配置的meshConfig.ConfigSources自己定了config配置来源的话,就走MCP的协议加载配置。

func (s *Server) initConfigSources(args *PilotArgs) (err error) {
	for _, configSource := range s.environment.Mesh().ConfigSources {
		srcAddress, err := url.Parse(configSource.Address)
		if err != nil {
			return fmt.Errorf("invalid config URL %s %v", configSource.Address, err)
		}
		scheme := ConfigSourceAddressScheme(srcAddress.Scheme)
		switch scheme {
		case File:
			if srcAddress.Path == "" {
				return fmt.Errorf("invalid fs config URL %s, contains no file path", configSource.Address)
			}
			store := memory.Make(collections.Pilot)
			configController := memory.NewController(store)

			err := s.makeFileMonitor(srcAddress.Path, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
			if err != nil {
				return err
			}
			s.ConfigStores = append(s.ConfigStores, configController)
		case XDS:
			xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{
				Namespace: args.Namespace,
				Workload:  args.PodName,
				Revision:  args.Revision,
				Meta: model.NodeMetadata{
					Generator: "api",
					// To reduce transported data if upstream server supports. Especially for custom servers.
					IstioRevision: args.Revision,
				}.ToStruct(),
				InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
			})
			if err != nil {
				return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
			}
			store := memory.Make(collections.Pilot)
			// TODO: enable namespace filter for memory controller
			configController := memory.NewController(store)
			configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
			xdsMCP.Store = configController
			err = xdsMCP.Run()
			if err != nil {
				return fmt.Errorf("MCP: failed running %v", err)
			}
			s.ConfigStores = append(s.ConfigStores, configController)
			log.Warn("Started XDS config ", s.ConfigStores)
		case Kubernetes:
			if srcAddress.Path == "" || srcAddress.Path == "/" {
				err2 := s.initK8SConfigStore(args)
				if err2 != nil {
					log.Warn("Error loading k8s ", err2)
					return err2
				}
				log.Warn("Started K8S config")
			} else {
				log.Warnf("Not implemented, ignore: %v", configSource.Address)
				// TODO: handle k8s:// scheme for remote cluster. Use same mechanism as service registry,
				// using the cluster name as key to match a secret.
			}
		default:
			log.Warnf("Ignoring unsupported config source: %v", configSource.Address)
		}
	}
	return nil
}

根据Address的schema,分成File、XDS、K8S三种配置存储,其中,File和XDS都是采用内存存储方式实现,本文重点关注XDS的运行过程。

首先是需要构建一个ADSC结构,这个结构是实现了ADS的基本客户端,主要用于压力测试和需要连接到Istio pilot或其他ADS服务器的工具或库。主要功能:

  • 连接到ProxyConfig中指定的XDS服务器
  • 如果需要证书的话,使用Secret提供程序获取证书
  • 发送watch资源的初始请求
  • 等待来自XDS服务器的响应
  • 成功后,用exp.backoff启动一个后台线程来维护连接。

运行的核心代码在于接收配置服务的响应:

func (a *ADSC) handleRecv() {
	for {
		var err error
		msg, err := a.stream.Recv()
		if err != nil {
			a.RecvWg.Done()
			adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err)
			select {
			case a.errChan <- err:
			default:
			}
			// if 'reconnect' enabled - schedule a new Run
			if a.cfg.BackoffPolicy != nil {
				time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
			} else {
				a.Close()
				a.WaitClear()
				a.Updates <- ""
				a.XDSUpdates <- nil
				close(a.errChan)
			}
			return
		}

		// 在Pilot Schemas集合中匹配gvk,并转换成Istio的Gvk
		gvk, isMCP := convertTypeURLToMCPGVK(msg.TypeUrl)

		adscLog.Info("Received ", a.url, " type ", msg.TypeUrl,
			" cnt=", len(msg.Resources), " nonce=", msg.Nonce)
		if a.cfg.ResponseHandler != nil {
			a.cfg.ResponseHandler.HandleResponse(a, msg)
		}

		if msg.TypeUrl == collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind().String() &&
			len(msg.Resources) > 0 {
			rsc := msg.Resources[0]
			m := &v1alpha1.MeshConfig{}
			err = proto.Unmarshal(rsc.Value, m)
			if err != nil {
				adscLog.Warn("Failed to unmarshal mesh config", err)
			}
                        //解析转换成Istio的meshConfig
			a.Mesh = m
			if a.LocalCacheDir != "" {
				strResponse, err := protomarshal.ToJSONWithIndent(m, "  ")
				if err != nil {
					continue
				}
				err = os.WriteFile(a.LocalCacheDir+"_mesh.json", []byte(strResponse), 0o644)
				if err != nil {
					continue
				}
			}
			continue
		}

		// Process the resources.
		a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
		switch msg.TypeUrl {
		case v3.ListenerType:
			listeners := make([]*listener.Listener, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				ll := &listener.Listener{}
				_ = proto.Unmarshal(valBytes, ll)
				listeners = append(listeners, ll)
			}
			a.handleLDS(listeners)
		case v3.ClusterType:
			clusters := make([]*cluster.Cluster, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				cl := &cluster.Cluster{}
				_ = proto.Unmarshal(valBytes, cl)
				clusters = append(clusters, cl)
			}
			a.handleCDS(clusters)
		case v3.EndpointType:
			eds := make([]*endpoint.ClusterLoadAssignment, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				el := &endpoint.ClusterLoadAssignment{}
				_ = proto.Unmarshal(valBytes, el)
				eds = append(eds, el)
			}
			a.handleEDS(eds)
		case v3.RouteType:
			routes := make([]*route.RouteConfiguration, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				rl := &route.RouteConfiguration{}
				_ = proto.Unmarshal(valBytes, rl)
				routes = append(routes, rl)
			}
			a.handleRDS(routes)
		default:
			if isMCP {
				a.handleMCP(gvk, msg.Resources)
			}
		}

		// If we got no resource - still save to the store with empty name/namespace, to notify sync
		// This scheme also allows us to chunk large responses !

		// TODO: add hook to inject nacks

		a.mutex.Lock()
		if isMCP {
			if _, exist := a.sync[gvk.String()]; !exist {
				a.sync[gvk.String()] = time.Now()
			}
		}
		a.Received[msg.TypeUrl] = msg
		a.ack(msg)
		a.mutex.Unlock()

		select {
		case a.XDSUpdates <- msg:
		default:
		}
	}
}

上述方法核心的流程在于根据msg.TypeUrl的类型,分别对XDS的ListenerType、ClusterType、EndpointType、RouteType做处理。

  1. 在handleLDS中主要是在Listener列表中找到终端Filter,根据Filter名称区分TCP还是Http,设置httpListeners和tcpListeners,发送获取对应端口的RouteType的请求
  2. handleRoute中主要是设置routes
  3. handleCluster主要是根据集群名发送EndpointType请求,设置edsClusters和clusters
  4. handleEDS主要是设置endpoints

 

0条评论
0 / 1000
网个大鱼
12文章数
1粉丝数
网个大鱼
12 文章 | 1 粉丝
原创

Istio配置管理机制解析

2023-07-06 06:12:53
37
0

一、背景

在 Istio 中,ConfigStoreController(配置存储控制器)是控制平面组件之一,其作用是管理和同步 Istio 配置存储的状态和变化。

具体而言,ConfigStoreController 的作用包括以下几个方面:

  • 配置存储的初始化:ConfigStoreController 负责初始化 Istio 的配置存储,包括加载和解析配置文件、读取和验证配置信息,并将其转化为内部数据结构。这些配置信息可以包括路由规则、策略配置、监控设置等。
  • 配置存储的更新和同步:ConfigStoreController 监听配置存储的变化,并负责将变化的配置信息更新到内部数据结构中,以确保控制平面使用的配置始终与存储的配置保持同步。当配置存储发生变化时,ConfigStoreController 将触发相应的更新操作,例如重新加载路由规则、更新策略配置等。
  • 配置的持久化和缓存:ConfigStoreController 将配置信息持久化到适当的存储介质中,以确保配置的可靠存储和持久性。它还维护一个本地缓存,用于快速访问配置信息,减少对底层存储的频繁访问。
  • 配置的验证和合规性检查:ConfigStoreController 对从配置存储加载的配置进行验证和合规性检查,确保配置的正确性和符合规范。它可以检查配置中的错误、冲突或不一致之类的问题,并提供相应的日志和警告信息。

ConfigStoreController 在 Istio 控制平面中起着重要的作用,负责管理和同步 Istio 的配置存储。它确保配置存储的状态始终与控制平面保持同步,并提供配置的加载、更新、持久化和合规性检查等功能。这有助于确保 Istio 控制平面使用正确的配置,并支持灵活的配置管理和更新能力。

二、配置存储控制器结构

在软件开发中,一个东西的结构和运行原理通常是先有结构,然后才有运行原理。

首先,设计和定义一个东西的结构是为了明确其组成部分、关系和功能。结构定义了该东西的静态属性和组织方式,可以是一种数据结构、类的定义、配置文件格式等。它描述了东西的整体框架和组织方式,以及各个组件之间的关系和交互。

一旦有了结构的定义,接下来就是理解和定义该东西的运行原理。运行原理涉及该东西的行为、工作方式和执行过程。它描述了如何使用该结构,以及在特定条件下该东西是如何运行的。运行原理可以包括算法、逻辑流程、工作流程等,它解释了东西是如何根据输入执行操作并产生输出的。

因此,通常是先定义一个东西的结构,然后再定义它的运行原理。结构提供了框架和组织方式,而运行原理定义了具体的行为和工作方式。这种顺序有助于在开发过程中明确需求、设计架构,并将其转化为可运行的实现。

ConfigStoreController是配置存储的本地完全复制缓存,具有额外的处理程序。控制器主动地将其本地状态与远程存储同步,并提供接收更新事件的通知机制。因此,通知处理程序必须在调用_Run_之前注册,并且缓存需要在调用_Run _之后的初始同步宽限期。更新通知需要以下一致性保证:缓存中的视图必须至少与通知到达时一样实时,但可能更加实时(例如,如果_Delete_取消_Add_事件)。处理程序按附加顺序在单个工作队列上执行。处理程序接收通知事件和关联的对象。请注意,在启动缓存控制器之前,必须注册所有处理程序。

结构如下:

type ConfigStoreController interface {
	ConfigStore

	// RegisterEventHandler 添加一个处理程序以接收配置类型的配置更新事件
	RegisterEventHandler(kind config.GroupVersionKind, handler EventHandler)

	// 运行直到收到信号
	Run(stop <-chan struct{})

	// 如果存储已启动,则应调用SetWatchErrorHandler
	SetWatchErrorHandler(func(r *cache.Reflector, err error)) error

	//store开始后返回true
	HasStarted() bool

	// 初始缓存同步完成后,HasSynced返回true
	HasSynced() bool
}

作为一个配置管理中心,首先是先定义好一个配置的存储结构和方式,ConfigStore就是这么一个结构,那配置定义有哪些,以及配置对应肯定会有增删改查的一些基本操作,因此ConfigStore的结构以接口的方式定义了一些公共的基础操作。

ConfigStore描述了一组与平台无关的API,底层平台必须支持这些API才能存储和检索Istio配置。配置键被定义为配置对象的类型、名称和命名空间的组合。配置密钥保证在存储中是唯一的。这里提供的存储接口假设底层存储层支持_Get_(list)、_Update_(Update)、_Create_(Create)和_Delete_语义,但不保证任何事务语义_Update_、_Create_和_Delete_是赋值操作。这些操作是异步的,可能不会立即看到效果(例如,_Get_可能不会在更改存储后立即按键返回对象。)即使操作成功,也可能会出现间歇性错误,因此即使更改操作返回错误,也应始终检查对象存储是否已被修改。应使用_Create_操作和_Update_创建对象。

资源版本记录每个对象上的最后一次变异操作。如果将突变应用于对象的不同修订,而不是底层存储所期望的(由纯相等定义),则该操作将被阻止。此接口的客户端不应对修订标识符的结构或顺序做出假设。从该接口提供和返回的对象引用应被视为只读。修改它们会违反线程安全性。

type ConfigStore interface {
	// Schemas 公开配置存储区已知的配置类型架构。类型模式定义了配置类型和protobuf编码模式之间的双向映射。
	Schemas() collection.Schemas

	// Get通过类型和键检索配置元素
	Get(typ config.GroupVersionKind, name, namespace string) *config.Config

	// 列表按类型和命名空间返回对象。对命名空间使用“”以跨命名空间列出。
	List(typ config.GroupVersionKind, namespace string) ([]config.Config, error)

	// Create将一个新的配置对象添加到存储中。如果该类型已存在具有相同名称和命名空间的对象,则操作将失败,不会产生任何副作用。
	Create(config config.Config) (revision string, err error)

	// 更新修改存储中的现有配置对象。更新要求已创建对象。资源版本防止覆盖在先前的_Get_和_Put_操作之间已更改的值以实现乐观并发。如果操作成功,此方法将返回新的修订。
	Update(config config.Config) (newRevision string, err error)
	UpdateStatus(config config.Config) (newRevision string, err error)

	// Patch仅应用在PatchFunc中所做的修改,而不是进行完全替换。当同一资源有多个并发写入程序时,有助于避免读-修改-写冲突。
	Patch(orig config.Config, patchFn config.PatchFunc) (string, error)

	// Delete通过键从存储中删除对象对于k8s,在执行删除之前必须满足resourceVersion。如果不可能,将返回409冲突状态。
	Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error
}

配置管理的另一个重要特性跟注册中心一样是兼容不同的配置中心。因为Istio主要依赖于K8s容器,因此,k8s的配置必定会支持,由于Istio的配置下发采用的是XDS协议,因此,如果支持非k8s的配置管理,该配置中心需要实现XDS协议。k8s配置可以应用k8s的etcd来存储配置,如果是非k8s配置中心,则采用内存缓存的方式在内部缓存一份配置,持久化的存储则依赖外部配置中心。

2.1 内存存储

type Controller struct {
	monitor     Monitor
	configStore model.ConfigStore
	hasSynced   func() bool

	started atomic.Bool
	namespacesFilter func(obj interface{}) bool
}

2.2 K8s存储

Client是Istio CRD的客户端,实现配置存储缓存。它用于Istio配置上的CRUD操作员,以及处理配置更改时的事件。

type Client struct {
	//模式定义了此客户端使用的模式集。注意:这必须是代码生成中定义的模式的子集
	schemas collection.Schemas

	// domain配置元数据的后缀
	domainSuffix string

	// 此控制平面实例的修订。将只阅读与此修订版匹配的配置。
	revision string

	// kinds跟踪已知类型的所有缓存处理程序
	kinds   map[config.GroupVersionKind]*cacheHandler
	kindsMu sync.RWMutex
	queue   queue.Instance

	// 处理程序定义每种类型的事件处理程序列表
	handlers map[config.GroupVersionKind][]model.EventHandler

	// 我们将用于访问对象的istioclient-go客户端
	istioClient istioclient.Interface

	// 我们将用于访问对象的网关api客户端
	gatewayAPIClient gatewayapiclient.Interface

	// 调用SyncAll时,beginSync设置为true,表示控制器已开始同步资源。
	beginSync *atomic.Bool
	// initialSync在执行所有对象的初始处理之后被设置为true。
	initialSync         *atomic.Bool
	schemasByCRDName    map[string]collection.Schema
	client              kube.Client
	crdMetadataInformer cache.SharedIndexInformer
	logger              *log.Scope

	// namespaceFilter仅用于启动已筛选的informer。
	namespacesFilter func(obj interface{}) bool
}

配置模式(schemas)指定了配置文件的结构、字段类型、验证规则等信息。通过使用“NewForSchemas”函数,可以根据特定的配置模式创建一个解析器,然后使用该解析器来解析配置文件或数据,这对于在Istio中解析和处理与特定模式相对应的配置文件非常有用,以确保配置的正确性和合规性。

schemas定义了Istio的Pilot资源集合:

  1. IstioExtensionsV1Alpha1Wasmplugins
  2. IstioNetworkingV1Alpha3Destinationrules
  3. IstioNetworkingV1Alpha3Envoyfilters
  4. IstioNetworkingV1Alpha3Gateways
  5. IstioNetworkingV1Alpha3Serviceentries
  6. IstioNetworkingV1Alpha3Sidecars
  7. IstioNetworkingV1Alpha3Virtualservices
  8. IstioNetworkingV1Alpha3Workloadentries
  9. IstioNetworkingV1Alpha3Workloadgroups
  10. IstioNetworkingV1Beta1Proxyconfigs
  11. IstioSecurityV1Beta1Authorizationpolicies
  12. IstioSecurityV1Beta1Peerauthentications
  13. IstioSecurityV1Beta1Requestauthentications
  14. IstioTelemetryV1Alpha1Telemetries

如果Kubernetes gateway-api设置为enabled的话,还会包括PilotGatewayAPI的资源。

 

三、配置存储控制器运行原理

3.1 Istio配置

在2.2章节讲到基于k8s的配置存储,存储的是Istio的资源配置,那有了这些配置存储之后,必定就会考虑到怎么使用,因为是基于K8S的,所以很容易想到Informer机制去实现Istio的配置监听,并同步到Istio配置存储中。

在为Istio创建Infromer监听器之前,需要做一些过滤操作,先是查询集群已有的CRD资源,对比schemas集合中的资源,命中则创建,然后匹配Kubernetes控制的CRD资源,如果命中,也创建该监听器。

func handleCRDAdd(cl *Client, name string, stop <-chan struct{}) {
	cl.logger.Debugf("adding CRD %q", name)
	s, f := cl.schemasByCRDName[name]
	if !f {
		cl.logger.Debugf("added resource that we are not watching: %v", name)
		return
	}
	resourceGVK := s.Resource().GroupVersionKind()
	gvr := s.Resource().GroupVersionResource()

	cl.kindsMu.Lock()
	defer cl.kindsMu.Unlock()
	if _, f := cl.kinds[resourceGVK]; f {
		cl.logger.Debugf("added resource that already exists: %v", resourceGVK)
		return
	}
	var i informers.GenericInformer
	var ifactory starter
	var err error
	switch s.Resource().Group() {
	case gvk.KubernetesGateway.Group:
		ifactory = cl.client.GatewayAPIInformer()
		i, err = cl.client.GatewayAPIInformer().ForResource(gvr)
	case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:
		ifactory = cl.client.KubeInformer()
		i, err = cl.client.KubeInformer().ForResource(gvr)
	case gvk.CustomResourceDefinition.Group:
		ifactory = cl.client.ExtInformer()
		i, err = cl.client.ExtInformer().ForResource(gvr)
	default:
		ifactory = cl.client.IstioInformer()
		i, err = cl.client.IstioInformer().ForResource(gvr)
	}

	if err != nil {
		// Shouldn't happen
		cl.logger.Errorf("failed to create informer for %v: %v", resourceGVK, err)
		return
	}
	_ = i.Informer().SetTransform(kube.StripUnusedFields)

	cl.kinds[resourceGVK] = createCacheHandler(cl, s, i)
	if w, f := crdWatches[resourceGVK]; f {
		cl.logger.Infof("notifying watchers %v was created", resourceGVK)
		w.once.Do(func() {
			close(w.stop)
		})
	}
	if stop != nil {
		// 仅当定义了停止时,才启动informer工厂。在启动的情况下,我们不会从这里开始,因为一旦我们准备好初始化,我们就会启动所有工厂。对于动态添加的CRD,我们需要立即启动
		ifactory.Start(stop)
	}
}

handleCrdAdd函数是用于处理自定义资源定义(Custom Resource Definitions,CRD)的添加事件的函数之一。

CRD是Kubernetes的扩展机制,允许用户定义自己的资源类型。Istio使用CRD来定义其特定的配置资源,如VirtualService、DestinationRule等。当Kubernetes集群中创建了一个新的Istio CRD实例时,将会触发相应的添加事件。

handleCrdAdd函数的作用包括以下几个方面:

解析CRD实例:函数负责解析新添加的CRD实例,并将其转换为内部数据结构,以便后续处理。它可以从CRD实例中提取关键信息,如配置规则、路由规则等。

验证CRD实例:函数对新添加的CRD实例进行验证,确保其符合预期的格式和规范。这可能涉及验证字段的完整性、类型的正确性以及与其他资源的一致性等。

执行配置更新:根据CRD实例的内容,函数会触发相应的配置更新操作。这可能包括更新路由规则、策略配置、服务发现等,以确保控制平面使用最新的配置。

通知其他组件:函数可能与其他组件进行通信,以便通知它们有关新的CRD实例的添加。这样,其他组件可以相应地采取行动,例如更新缓存、重新加载配置等。

总的来说,handleCrdAdd函数是Istio控制平面中的一部分,负责处理新添加的CRD实例。它负责解析、验证和处理CRD实例,并相应地更新和通知其他组件,以确保Istio控制平面中的相关配置保持最新和一致。

重点关注IstioInformer,里面定义了Istio所有资源informer:

func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
	switch resource {
	// Group=extensions.istio.io, Version=v1alpha1
	case v1alpha1.SchemeGroupVersion.WithResource("wasmplugins"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Extensions().V1alpha1().WasmPlugins().Informer()}, nil

		// Group=networking.istio.io, Version=v1alpha3
	case v1alpha3.SchemeGroupVersion.WithResource("destinationrules"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().DestinationRules().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("envoyfilters"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().EnvoyFilters().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("gateways"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().Gateways().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("serviceentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().ServiceEntries().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("sidecars"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().Sidecars().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("virtualservices"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().VirtualServices().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("workloadentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().WorkloadEntries().Informer()}, nil
	case v1alpha3.SchemeGroupVersion.WithResource("workloadgroups"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1alpha3().WorkloadGroups().Informer()}, nil

		// Group=networking.istio.io, Version=v1beta1
	case v1beta1.SchemeGroupVersion.WithResource("destinationrules"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().DestinationRules().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("gateways"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().Gateways().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("proxyconfigs"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().ProxyConfigs().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("serviceentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().ServiceEntries().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("sidecars"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().Sidecars().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("virtualservices"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().VirtualServices().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("workloadentries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().WorkloadEntries().Informer()}, nil
	case v1beta1.SchemeGroupVersion.WithResource("workloadgroups"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().WorkloadGroups().Informer()}, nil

		// Group=security.istio.io, Version=v1beta1
	case securityv1beta1.SchemeGroupVersion.WithResource("authorizationpolicies"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Security().V1beta1().AuthorizationPolicies().Informer()}, nil
	case securityv1beta1.SchemeGroupVersion.WithResource("peerauthentications"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Security().V1beta1().PeerAuthentications().Informer()}, nil
	case securityv1beta1.SchemeGroupVersion.WithResource("requestauthentications"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Security().V1beta1().RequestAuthentications().Informer()}, nil

		// Group=telemetry.istio.io, Version=v1alpha1
	case telemetryv1alpha1.SchemeGroupVersion.WithResource("telemetries"):
		return &genericInformer{resource: resource.GroupResource(), informer: f.Telemetry().V1alpha1().Telemetries().Informer()}, nil

	}

	return nil, fmt.Errorf("no informer found for %v", resource)
}

3.2 MCP

如果网格配置的meshConfig.ConfigSources自己定了config配置来源的话,就走MCP的协议加载配置。

func (s *Server) initConfigSources(args *PilotArgs) (err error) {
	for _, configSource := range s.environment.Mesh().ConfigSources {
		srcAddress, err := url.Parse(configSource.Address)
		if err != nil {
			return fmt.Errorf("invalid config URL %s %v", configSource.Address, err)
		}
		scheme := ConfigSourceAddressScheme(srcAddress.Scheme)
		switch scheme {
		case File:
			if srcAddress.Path == "" {
				return fmt.Errorf("invalid fs config URL %s, contains no file path", configSource.Address)
			}
			store := memory.Make(collections.Pilot)
			configController := memory.NewController(store)

			err := s.makeFileMonitor(srcAddress.Path, args.RegistryOptions.KubeOptions.DomainSuffix, configController)
			if err != nil {
				return err
			}
			s.ConfigStores = append(s.ConfigStores, configController)
		case XDS:
			xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{
				Namespace: args.Namespace,
				Workload:  args.PodName,
				Revision:  args.Revision,
				Meta: model.NodeMetadata{
					Generator: "api",
					// To reduce transported data if upstream server supports. Especially for custom servers.
					IstioRevision: args.Revision,
				}.ToStruct(),
				InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
			})
			if err != nil {
				return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
			}
			store := memory.Make(collections.Pilot)
			// TODO: enable namespace filter for memory controller
			configController := memory.NewController(store)
			configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
			xdsMCP.Store = configController
			err = xdsMCP.Run()
			if err != nil {
				return fmt.Errorf("MCP: failed running %v", err)
			}
			s.ConfigStores = append(s.ConfigStores, configController)
			log.Warn("Started XDS config ", s.ConfigStores)
		case Kubernetes:
			if srcAddress.Path == "" || srcAddress.Path == "/" {
				err2 := s.initK8SConfigStore(args)
				if err2 != nil {
					log.Warn("Error loading k8s ", err2)
					return err2
				}
				log.Warn("Started K8S config")
			} else {
				log.Warnf("Not implemented, ignore: %v", configSource.Address)
				// TODO: handle k8s:// scheme for remote cluster. Use same mechanism as service registry,
				// using the cluster name as key to match a secret.
			}
		default:
			log.Warnf("Ignoring unsupported config source: %v", configSource.Address)
		}
	}
	return nil
}

根据Address的schema,分成File、XDS、K8S三种配置存储,其中,File和XDS都是采用内存存储方式实现,本文重点关注XDS的运行过程。

首先是需要构建一个ADSC结构,这个结构是实现了ADS的基本客户端,主要用于压力测试和需要连接到Istio pilot或其他ADS服务器的工具或库。主要功能:

  • 连接到ProxyConfig中指定的XDS服务器
  • 如果需要证书的话,使用Secret提供程序获取证书
  • 发送watch资源的初始请求
  • 等待来自XDS服务器的响应
  • 成功后,用exp.backoff启动一个后台线程来维护连接。

运行的核心代码在于接收配置服务的响应:

func (a *ADSC) handleRecv() {
	for {
		var err error
		msg, err := a.stream.Recv()
		if err != nil {
			a.RecvWg.Done()
			adscLog.Infof("Connection closed for node %v with err: %v", a.nodeID, err)
			select {
			case a.errChan <- err:
			default:
			}
			// if 'reconnect' enabled - schedule a new Run
			if a.cfg.BackoffPolicy != nil {
				time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
			} else {
				a.Close()
				a.WaitClear()
				a.Updates <- ""
				a.XDSUpdates <- nil
				close(a.errChan)
			}
			return
		}

		// 在Pilot Schemas集合中匹配gvk,并转换成Istio的Gvk
		gvk, isMCP := convertTypeURLToMCPGVK(msg.TypeUrl)

		adscLog.Info("Received ", a.url, " type ", msg.TypeUrl,
			" cnt=", len(msg.Resources), " nonce=", msg.Nonce)
		if a.cfg.ResponseHandler != nil {
			a.cfg.ResponseHandler.HandleResponse(a, msg)
		}

		if msg.TypeUrl == collections.IstioMeshV1Alpha1MeshConfig.Resource().GroupVersionKind().String() &&
			len(msg.Resources) > 0 {
			rsc := msg.Resources[0]
			m := &v1alpha1.MeshConfig{}
			err = proto.Unmarshal(rsc.Value, m)
			if err != nil {
				adscLog.Warn("Failed to unmarshal mesh config", err)
			}
                        //解析转换成Istio的meshConfig
			a.Mesh = m
			if a.LocalCacheDir != "" {
				strResponse, err := protomarshal.ToJSONWithIndent(m, "  ")
				if err != nil {
					continue
				}
				err = os.WriteFile(a.LocalCacheDir+"_mesh.json", []byte(strResponse), 0o644)
				if err != nil {
					continue
				}
			}
			continue
		}

		// Process the resources.
		a.VersionInfo[msg.TypeUrl] = msg.VersionInfo
		switch msg.TypeUrl {
		case v3.ListenerType:
			listeners := make([]*listener.Listener, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				ll := &listener.Listener{}
				_ = proto.Unmarshal(valBytes, ll)
				listeners = append(listeners, ll)
			}
			a.handleLDS(listeners)
		case v3.ClusterType:
			clusters := make([]*cluster.Cluster, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				cl := &cluster.Cluster{}
				_ = proto.Unmarshal(valBytes, cl)
				clusters = append(clusters, cl)
			}
			a.handleCDS(clusters)
		case v3.EndpointType:
			eds := make([]*endpoint.ClusterLoadAssignment, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				el := &endpoint.ClusterLoadAssignment{}
				_ = proto.Unmarshal(valBytes, el)
				eds = append(eds, el)
			}
			a.handleEDS(eds)
		case v3.RouteType:
			routes := make([]*route.RouteConfiguration, 0, len(msg.Resources))
			for _, rsc := range msg.Resources {
				valBytes := rsc.Value
				rl := &route.RouteConfiguration{}
				_ = proto.Unmarshal(valBytes, rl)
				routes = append(routes, rl)
			}
			a.handleRDS(routes)
		default:
			if isMCP {
				a.handleMCP(gvk, msg.Resources)
			}
		}

		// If we got no resource - still save to the store with empty name/namespace, to notify sync
		// This scheme also allows us to chunk large responses !

		// TODO: add hook to inject nacks

		a.mutex.Lock()
		if isMCP {
			if _, exist := a.sync[gvk.String()]; !exist {
				a.sync[gvk.String()] = time.Now()
			}
		}
		a.Received[msg.TypeUrl] = msg
		a.ack(msg)
		a.mutex.Unlock()

		select {
		case a.XDSUpdates <- msg:
		default:
		}
	}
}

上述方法核心的流程在于根据msg.TypeUrl的类型,分别对XDS的ListenerType、ClusterType、EndpointType、RouteType做处理。

  1. 在handleLDS中主要是在Listener列表中找到终端Filter,根据Filter名称区分TCP还是Http,设置httpListeners和tcpListeners,发送获取对应端口的RouteType的请求
  2. handleRoute中主要是设置routes
  3. handleCluster主要是根据集群名发送EndpointType请求,设置edsClusters和clusters
  4. handleEDS主要是设置endpoints

 

文章来自个人专栏
微服务架构-服务网格
12 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
2
2