searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

istio源码分析:外部服务注册发现

2023-07-26 08:12:42
89
0

背景

istio作为服务网格当前最流行的框架之一,对k8s以外的配置中心,如何进行对接,一直存在很强的需求。业界实现的方案也是五花八门,不变的是,尽可能的兼容已有的系统,可以大大降低使用的门槛。

本文主要以对接nacos为例,对比istio若干种实现方式,如何实现对接。

主要参考文章 如何将第三方服务中心注册集成到 Istio

由于istio发展比较快,这篇文章里面很多东西现状都不一样了,但是整体框架改动不大,下文会结合这篇文章和istio(基于1.16)的源码一起分析。其中差异点会着重通过代码分析指出来。

 

理论框架

istio控制面主要框架

 

  • Config Controller:用于管理各种配置数据,包括用户创建的流量管理规则和策略。配置数据中有两个 API 对象和服务模型相关,ServiceEntry 和 WorkloadEntry。Istio 目前支持三种类型的 Config Controller:
    • Kubernetes:使用 Kubernetes 来作为配置数据的存储,该方式的直接依附于 Kubernetes 强大的 CRD 机制来存储配置数据,简单方便,是 Istio 缺省使用的配置存储方案。
    • Memory:一个在内存中的 Config Controller 实现,可以监控一个文件目录,加载该目录中的 yaml 文件中定义的 Istio API 配置对象,该方式主要用于测试。
    • MCP:通过 MCP(Mesh Configuration Protocol) 协议,可以接入一个到多个 MCP Server。Pilot 从 MCP server 中获取网格的配置数据,包括 ServiceEntry 和 WorkloadEntry 定义的服务数据,以及 VirtualService,DestinationRule 等路由规则等其他配置。Istio 中有一个 Galley 组件,该组件实现为一个 MCP Server,从 Kubernetes API Server 中获取配置数据,然后通过 MCP 协议提供给 Pilot。
  • Service Controller:负责接入各种 Service Registry,从 Service Registry 同步需要在网格中进行管理的服务,目前Istio支持的Service Registry包括:
    • Kubernetes:对接 Kubernetes Registry,可以将 Kubernetes 的 Service 和 Endpoint 采集到 Istio 中。
    • Consul: 对接Consul Catalog,将注册到 Consul 中的服务数据采集到 Istio 中。
    • External Service Discovery:该 Service Registry 比较特殊,后端并未对接到一个服务注册表,而是会监听 Config Controller 的配置变化消息,从 Config Controller 中获取 ServiceEntry 和 WorkloadEntry 资源,然后以 Service Registry 的形式提供给 Service Controller。
  • Discovery Service:将服务模型和控制面配置转换为数据面标准数据格式,通过 xDS 接口下发给数据面的代理。主要包含下述逻辑:
    • 启动GRPC Server并接收来自Envoy端的连接请求。
    • 接收Envoy端的xDS请求,从Config Controller和Service Controller中获取配置和服务信息,生成响应消息发送给Envoy。
    • 监听来自Config Controller的配置变化消息和来自Service Controller的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy。

从该文中可以发现,主体框架基本一致,差异点主要是上文的Service Controller组件。

核心代码如下:

// initServiceControllers creates and initializes the service controllers
func (s *Server) initServiceControllers(args *PilotArgs) error {
	serviceControllers := s.ServiceController()

	s.serviceEntryController = serviceentry.NewController(
		s.configController, s.XDSServer,
		serviceentry.WithClusterID(s.clusterID),
	)
	serviceControllers.AddRegistry(s.serviceEntryController)

	registered := make(map[provider.ID]bool)
	for _, r := range args.RegistryOptions.Registries {
		serviceRegistry := provider.ID(r)
		if _, exists := registered[serviceRegistry]; exists {
			log.Warnf("%s registry specified multiple times.", r)
			continue
		}
		registered[serviceRegistry] = true
		log.Infof("Adding %s registry adapter", serviceRegistry)
		switch serviceRegistry {
		case provider.Kubernetes:
			if err := s.initKubeRegistry(args); err != nil {
				return err
			}
		default:
			return fmt.Errorf("service registry %s is not supported", r)
		}
	}

	// Defer running of the service controllers.
	s.addStartFunc(func(stop <-chan struct{}) error {
		go serviceControllers.Run(stop)
		return nil
	})

	return nil
}



从这一部分代码可以看出,当前istio仅支持k8s的Registry。因此上述的Consul和External Service Discovery并不存在了,所以本文也对其进行了置灰。

 

那么是否ServiceController就不再关心ServiceEntry和WorkLoadEntry的变化了呢?答案也是否定的,从上述代码片中可以看到ServiceController在初始化的时候还Add了一个ServiceEntryController,跟进代码可以看到,里面实际上还是接管了ServiceEntry和WorkLoadEntry。

// NewController creates a new ServiceEntry discovery service.
func NewController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
	options ...Option,
) *Controller {
	s := newController(configController, xdsUpdater, options...)
	if configController != nil {
		configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
		configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
		_ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID))
	}
	return s
}

 

因此我们可以得出结论,Consul Registry已经废弃,External Service Discovery合并到ServiceController之中,直接接管ServiceEntry和WorkLoadEntry的资源变化了。

对比源码后,当前的框架应该如下:

 

 

其他服务注册表的集成

依然是参考该文章,里面给出了三种不同的集成方式。

 

上图中分别用红、绿、三种颜色标识了这三种不同的集成方式。

该文章的处理建议是走红色路线,实现一个自定义的Adapter。不选择其他路线的原因:

  1. 绿色路线ApiServer的性能会是一个瓶颈。
  2. 蓝色路线因为galley已经废弃了,不建议启用。

我们在调研的过程中,测试过直接通过k8s的ApiServer操作,无论是对POD还是对ServiceEntry,性能都是一个问题。红色路线需要改动istio的源码,也没有采用。而蓝色路线,看了最新的代码,实际上nacos社区已经支持MCP Over XDS协议,反而是成为我们采用的方案。

实现方案分析

MCP方案代码分析

具体实现方式,在istio启动时,把外部nacos的地址作为configSources传入,schema协议头为xds://。

关键配置信息如下:

apiVersion: v1

data:
  mesh: |-
    accessLogFile: /dev/stdout
    configSources:
        - address: xds://192.168.x.xx:18848
        - address: k8s://localhost/

 

configController初始化

关键代码分析如下

// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
	s.initStatusController(args, features.EnableStatus)
	meshConfig := s.environment.Mesh()
	if len(meshConfig.ConfigSources) > 0 {
		// Using MCP for config.
		if err := s.initConfigSources(args); err != nil {
			return err
		}
	}

//下文省略

 

如果有配置configSources,则以configSources为准。注意:为了istio能正常服务,k8s依然需要作为配置传入。

 

xds://处理

//上文省略
case XDS:
			xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{
				Namespace: args.Namespace,
				Workload:  args.PodName,
				Revision:  args.Revision,
				Meta: model.NodeMetadata{
					Generator: "api",
					// To reduce transported data if upstream server supports. Especially for custom servers.
					IstioRevision: args.Revision,
				}.ToStruct(),
				InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
			})
			if err != nil {
				return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
			}
			store := memory.Make(collections.Pilot)
			// TODO: enable namespace filter for memory controller
			configController := memory.NewController(store)
			configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
			xdsMCP.Store = configController
			err = xdsMCP.Run()
			if err != nil {
				return fmt.Errorf("MCP: failed running %v", err)
			}
			s.ConfigStores = append(s.ConfigStores, configController)
			log.Warn("Started XDS config ", s.ConfigStores)

//下文省略

xdsMCP流程

总结一下,xdsMCP里面会建立一个grpc链接,拉取配置信息,外部服务作为ServiceEntry下发。

  1. 初始化
  2. 根据协议创建grpc连接后,client端发送一个init请求(InitialDiscoveryRequests),表示自己要监听哪些资源, 外部服务作为ServiceEntry
  3. 随后等待对端返回,并分发处理,即go a.handleRecv()

核心代码如下

func (a *ADSC) Run() error {
    var err error
    a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
    a.stream, err = a.client.StreamAggregatedResources(context.Background())
    if err != nil {
       return err
    }
    a.sendNodeMeta = true
    a.InitialLoad = 0
    // Send the initial requests
    for _, r := range a.cfg.InitialDiscoveryRequests {
       if r.TypeUrl == v3.ClusterType {
          a.watchTime = time.Now()
       }
       _ = a.Send(r)
    }
    // by default, we assume 1 goroutine decrements the waitgroup (go a.handleRecv()).
    // for synchronizing when the goroutine finishes reading from the gRPC stream.

    a.RecvWg.Add(1)

    go a.handleRecv()
    return nil
}

configController处理

  1. initConfigSources除了xdsMCP.Run()之外,还创建了一个configController,实际是一个内存副本。
    //上文省略
    store := memory.Make(collections.Pilot)
    			// TODO: enable namespace filter for memory controller
    			configController := memory.NewController(store)
    			configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
    			xdsMCP.Store = configController
    			err = xdsMCP.Run()
    			if err != nil {
    				return fmt.Errorf("MCP: failed running %v", err)
    			}
    			s.ConfigStores = append(s.ConfigStores, configController)
    			log.Warn("Started XDS config ", s.ConfigStores)
    //下文省略
    
  2. 这就意味着handleMCP都会触发更新内存中的configStore。RUN的时候除了更新store列表,还起了一个monitor在监听事件。
func (c *Controller) Run(stop <-chan struct{}) {
    c.started.Store(true)
    c.monitor.Run(stop)
}
//事件包括以下几种:
const (
    // EventAdd is sent when an object is added
    EventAdd Event = iota

    // EventUpdate is sent when an object is modified
    // Captures the modified object
    EventUpdate

    // EventDelete is sent when an object is deleted
    // Captures the object at the last known state
    EventDelete
)

结合前面所说的handleMCP对ConfigStore进行增删改之后,会触发monitor相关的事件,从而分发到各handler处理。

总结

MCP方案同步的部分总体流程如下。

 

后续这些配置变更,会经过istio下发到各个数据面,本文不再展开。验证方法可以查看istio控制面日志确认。

 

0条评论
作者已关闭评论
g****m
3文章数
0粉丝数
g****m
3 文章 | 0 粉丝
g****m
3文章数
0粉丝数
g****m
3 文章 | 0 粉丝
原创

istio源码分析:外部服务注册发现

2023-07-26 08:12:42
89
0

背景

istio作为服务网格当前最流行的框架之一,对k8s以外的配置中心,如何进行对接,一直存在很强的需求。业界实现的方案也是五花八门,不变的是,尽可能的兼容已有的系统,可以大大降低使用的门槛。

本文主要以对接nacos为例,对比istio若干种实现方式,如何实现对接。

主要参考文章 如何将第三方服务中心注册集成到 Istio

由于istio发展比较快,这篇文章里面很多东西现状都不一样了,但是整体框架改动不大,下文会结合这篇文章和istio(基于1.16)的源码一起分析。其中差异点会着重通过代码分析指出来。

 

理论框架

istio控制面主要框架

 

  • Config Controller:用于管理各种配置数据,包括用户创建的流量管理规则和策略。配置数据中有两个 API 对象和服务模型相关,ServiceEntry 和 WorkloadEntry。Istio 目前支持三种类型的 Config Controller:
    • Kubernetes:使用 Kubernetes 来作为配置数据的存储,该方式的直接依附于 Kubernetes 强大的 CRD 机制来存储配置数据,简单方便,是 Istio 缺省使用的配置存储方案。
    • Memory:一个在内存中的 Config Controller 实现,可以监控一个文件目录,加载该目录中的 yaml 文件中定义的 Istio API 配置对象,该方式主要用于测试。
    • MCP:通过 MCP(Mesh Configuration Protocol) 协议,可以接入一个到多个 MCP Server。Pilot 从 MCP server 中获取网格的配置数据,包括 ServiceEntry 和 WorkloadEntry 定义的服务数据,以及 VirtualService,DestinationRule 等路由规则等其他配置。Istio 中有一个 Galley 组件,该组件实现为一个 MCP Server,从 Kubernetes API Server 中获取配置数据,然后通过 MCP 协议提供给 Pilot。
  • Service Controller:负责接入各种 Service Registry,从 Service Registry 同步需要在网格中进行管理的服务,目前Istio支持的Service Registry包括:
    • Kubernetes:对接 Kubernetes Registry,可以将 Kubernetes 的 Service 和 Endpoint 采集到 Istio 中。
    • Consul: 对接Consul Catalog,将注册到 Consul 中的服务数据采集到 Istio 中。
    • External Service Discovery:该 Service Registry 比较特殊,后端并未对接到一个服务注册表,而是会监听 Config Controller 的配置变化消息,从 Config Controller 中获取 ServiceEntry 和 WorkloadEntry 资源,然后以 Service Registry 的形式提供给 Service Controller。
  • Discovery Service:将服务模型和控制面配置转换为数据面标准数据格式,通过 xDS 接口下发给数据面的代理。主要包含下述逻辑:
    • 启动GRPC Server并接收来自Envoy端的连接请求。
    • 接收Envoy端的xDS请求,从Config Controller和Service Controller中获取配置和服务信息,生成响应消息发送给Envoy。
    • 监听来自Config Controller的配置变化消息和来自Service Controller的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy。

从该文中可以发现,主体框架基本一致,差异点主要是上文的Service Controller组件。

核心代码如下:

// initServiceControllers creates and initializes the service controllers
func (s *Server) initServiceControllers(args *PilotArgs) error {
	serviceControllers := s.ServiceController()

	s.serviceEntryController = serviceentry.NewController(
		s.configController, s.XDSServer,
		serviceentry.WithClusterID(s.clusterID),
	)
	serviceControllers.AddRegistry(s.serviceEntryController)

	registered := make(map[provider.ID]bool)
	for _, r := range args.RegistryOptions.Registries {
		serviceRegistry := provider.ID(r)
		if _, exists := registered[serviceRegistry]; exists {
			log.Warnf("%s registry specified multiple times.", r)
			continue
		}
		registered[serviceRegistry] = true
		log.Infof("Adding %s registry adapter", serviceRegistry)
		switch serviceRegistry {
		case provider.Kubernetes:
			if err := s.initKubeRegistry(args); err != nil {
				return err
			}
		default:
			return fmt.Errorf("service registry %s is not supported", r)
		}
	}

	// Defer running of the service controllers.
	s.addStartFunc(func(stop <-chan struct{}) error {
		go serviceControllers.Run(stop)
		return nil
	})

	return nil
}



从这一部分代码可以看出,当前istio仅支持k8s的Registry。因此上述的Consul和External Service Discovery并不存在了,所以本文也对其进行了置灰。

 

那么是否ServiceController就不再关心ServiceEntry和WorkLoadEntry的变化了呢?答案也是否定的,从上述代码片中可以看到ServiceController在初始化的时候还Add了一个ServiceEntryController,跟进代码可以看到,里面实际上还是接管了ServiceEntry和WorkLoadEntry。

// NewController creates a new ServiceEntry discovery service.
func NewController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
	options ...Option,
) *Controller {
	s := newController(configController, xdsUpdater, options...)
	if configController != nil {
		configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
		configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
		_ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID))
	}
	return s
}

 

因此我们可以得出结论,Consul Registry已经废弃,External Service Discovery合并到ServiceController之中,直接接管ServiceEntry和WorkLoadEntry的资源变化了。

对比源码后,当前的框架应该如下:

 

 

其他服务注册表的集成

依然是参考该文章,里面给出了三种不同的集成方式。

 

上图中分别用红、绿、三种颜色标识了这三种不同的集成方式。

该文章的处理建议是走红色路线,实现一个自定义的Adapter。不选择其他路线的原因:

  1. 绿色路线ApiServer的性能会是一个瓶颈。
  2. 蓝色路线因为galley已经废弃了,不建议启用。

我们在调研的过程中,测试过直接通过k8s的ApiServer操作,无论是对POD还是对ServiceEntry,性能都是一个问题。红色路线需要改动istio的源码,也没有采用。而蓝色路线,看了最新的代码,实际上nacos社区已经支持MCP Over XDS协议,反而是成为我们采用的方案。

实现方案分析

MCP方案代码分析

具体实现方式,在istio启动时,把外部nacos的地址作为configSources传入,schema协议头为xds://。

关键配置信息如下:

apiVersion: v1

data:
  mesh: |-
    accessLogFile: /dev/stdout
    configSources:
        - address: xds://192.168.x.xx:18848
        - address: k8s://localhost/

 

configController初始化

关键代码分析如下

// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
	s.initStatusController(args, features.EnableStatus)
	meshConfig := s.environment.Mesh()
	if len(meshConfig.ConfigSources) > 0 {
		// Using MCP for config.
		if err := s.initConfigSources(args); err != nil {
			return err
		}
	}

//下文省略

 

如果有配置configSources,则以configSources为准。注意:为了istio能正常服务,k8s依然需要作为配置传入。

 

xds://处理

//上文省略
case XDS:
			xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{
				Namespace: args.Namespace,
				Workload:  args.PodName,
				Revision:  args.Revision,
				Meta: model.NodeMetadata{
					Generator: "api",
					// To reduce transported data if upstream server supports. Especially for custom servers.
					IstioRevision: args.Revision,
				}.ToStruct(),
				InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
			})
			if err != nil {
				return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
			}
			store := memory.Make(collections.Pilot)
			// TODO: enable namespace filter for memory controller
			configController := memory.NewController(store)
			configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
			xdsMCP.Store = configController
			err = xdsMCP.Run()
			if err != nil {
				return fmt.Errorf("MCP: failed running %v", err)
			}
			s.ConfigStores = append(s.ConfigStores, configController)
			log.Warn("Started XDS config ", s.ConfigStores)

//下文省略

xdsMCP流程

总结一下,xdsMCP里面会建立一个grpc链接,拉取配置信息,外部服务作为ServiceEntry下发。

  1. 初始化
  2. 根据协议创建grpc连接后,client端发送一个init请求(InitialDiscoveryRequests),表示自己要监听哪些资源, 外部服务作为ServiceEntry
  3. 随后等待对端返回,并分发处理,即go a.handleRecv()

核心代码如下

func (a *ADSC) Run() error {
    var err error
    a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
    a.stream, err = a.client.StreamAggregatedResources(context.Background())
    if err != nil {
       return err
    }
    a.sendNodeMeta = true
    a.InitialLoad = 0
    // Send the initial requests
    for _, r := range a.cfg.InitialDiscoveryRequests {
       if r.TypeUrl == v3.ClusterType {
          a.watchTime = time.Now()
       }
       _ = a.Send(r)
    }
    // by default, we assume 1 goroutine decrements the waitgroup (go a.handleRecv()).
    // for synchronizing when the goroutine finishes reading from the gRPC stream.

    a.RecvWg.Add(1)

    go a.handleRecv()
    return nil
}

configController处理

  1. initConfigSources除了xdsMCP.Run()之外,还创建了一个configController,实际是一个内存副本。
    //上文省略
    store := memory.Make(collections.Pilot)
    			// TODO: enable namespace filter for memory controller
    			configController := memory.NewController(store)
    			configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
    			xdsMCP.Store = configController
    			err = xdsMCP.Run()
    			if err != nil {
    				return fmt.Errorf("MCP: failed running %v", err)
    			}
    			s.ConfigStores = append(s.ConfigStores, configController)
    			log.Warn("Started XDS config ", s.ConfigStores)
    //下文省略
    
  2. 这就意味着handleMCP都会触发更新内存中的configStore。RUN的时候除了更新store列表,还起了一个monitor在监听事件。
func (c *Controller) Run(stop <-chan struct{}) {
    c.started.Store(true)
    c.monitor.Run(stop)
}
//事件包括以下几种:
const (
    // EventAdd is sent when an object is added
    EventAdd Event = iota

    // EventUpdate is sent when an object is modified
    // Captures the modified object
    EventUpdate

    // EventDelete is sent when an object is deleted
    // Captures the object at the last known state
    EventDelete
)

结合前面所说的handleMCP对ConfigStore进行增删改之后,会触发monitor相关的事件,从而分发到各handler处理。

总结

MCP方案同步的部分总体流程如下。

 

后续这些配置变更,会经过istio下发到各个数据面,本文不再展开。验证方法可以查看istio控制面日志确认。

 

文章来自个人专栏
istio源码分析:外部服务注册发现
1 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0