背景
istio作为服务网格当前最流行的框架之一,对k8s以外的配置中心,如何进行对接,一直存在很强的需求。业界实现的方案也是五花八门,不变的是,尽可能的兼容已有的系统,可以大大降低使用的门槛。
本文主要以对接nacos为例,对比istio若干种实现方式,如何实现对接。
主要参考文章 如何将第三方服务中心注册集成到 Istio
由于istio发展比较快,这篇文章里面很多东西现状都不一样了,但是整体框架改动不大,下文会结合这篇文章和istio(基于1.16)的源码一起分析。其中差异点会着重通过代码分析指出来。
理论框架
istio控制面主要框架
- Config Controller:用于管理各种配置数据,包括用户创建的流量管理规则和策略。配置数据中有两个 API 对象和服务模型相关,ServiceEntry 和 WorkloadEntry。Istio 目前支持三种类型的 Config Controller:
- Kubernetes:使用 Kubernetes 来作为配置数据的存储,该方式的直接依附于 Kubernetes 强大的 CRD 机制来存储配置数据,简单方便,是 Istio 缺省使用的配置存储方案。
- Memory:一个在内存中的 Config Controller 实现,可以监控一个文件目录,加载该目录中的 yaml 文件中定义的 Istio API 配置对象,该方式主要用于测试。
- MCP:通过 MCP(Mesh Configuration Protocol) 协议,可以接入一个到多个 MCP Server。Pilot 从 MCP server 中获取网格的配置数据,包括 ServiceEntry 和 WorkloadEntry 定义的服务数据,以及 VirtualService,DestinationRule 等路由规则等其他配置。Istio 中有一个 Galley 组件,该组件实现为一个 MCP Server,从 Kubernetes API Server 中获取配置数据,然后通过 MCP 协议提供给 Pilot。
- Service Controller:负责接入各种 Service Registry,从 Service Registry 同步需要在网格中进行管理的服务,目前Istio支持的Service Registry包括:
- Kubernetes:对接 Kubernetes Registry,可以将 Kubernetes 的 Service 和 Endpoint 采集到 Istio 中。
- Consul: 对接Consul Catalog,将注册到 Consul 中的服务数据采集到 Istio 中。
- External Service Discovery:该 Service Registry 比较特殊,后端并未对接到一个服务注册表,而是会监听 Config Controller 的配置变化消息,从 Config Controller 中获取 ServiceEntry 和 WorkloadEntry 资源,然后以 Service Registry 的形式提供给 Service Controller。
- Discovery Service:将服务模型和控制面配置转换为数据面标准数据格式,通过 xDS 接口下发给数据面的代理。主要包含下述逻辑:
- 启动GRPC Server并接收来自Envoy端的连接请求。
- 接收Envoy端的xDS请求,从Config Controller和Service Controller中获取配置和服务信息,生成响应消息发送给Envoy。
- 监听来自Config Controller的配置变化消息和来自Service Controller的服务变化消息,并将配置和服务变化内容通过xDS接口推送到Envoy。
从该文中可以发现,主体框架基本一致,差异点主要是上文的Service Controller组件。
核心代码如下:
// initServiceControllers creates and initializes the service controllers
func (s *Server) initServiceControllers(args *PilotArgs) error {
serviceControllers := s.ServiceController()
s.serviceEntryController = serviceentry.NewController(
s.configController, s.XDSServer,
serviceentry.WithClusterID(s.clusterID),
)
serviceControllers.AddRegistry(s.serviceEntryController)
registered := make(map[provider.ID]bool)
for _, r := range args.RegistryOptions.Registries {
serviceRegistry := provider.ID(r)
if _, exists := registered[serviceRegistry]; exists {
log.Warnf("%s registry specified multiple times.", r)
continue
}
registered[serviceRegistry] = true
log.Infof("Adding %s registry adapter", serviceRegistry)
switch serviceRegistry {
case provider.Kubernetes:
if err := s.initKubeRegistry(args); err != nil {
return err
}
default:
return fmt.Errorf("service registry %s is not supported", r)
}
}
// Defer running of the service controllers.
s.addStartFunc(func(stop <-chan struct{}) error {
go serviceControllers.Run(stop)
return nil
})
return nil
}
从这一部分代码可以看出,当前istio仅支持k8s的Registry。因此上述的Consul和External Service Discovery并不存在了,所以本文也对其进行了置灰。
那么是否ServiceController就不再关心ServiceEntry和WorkLoadEntry的变化了呢?答案也是否定的,从上述代码片中可以看到ServiceController在初始化的时候还Add了一个ServiceEntryController,跟进代码可以看到,里面实际上还是接管了ServiceEntry和WorkLoadEntry。
// NewController creates a new ServiceEntry discovery service.
func NewController(configController model.ConfigStoreController, xdsUpdater model.XDSUpdater,
options ...Option,
) *Controller {
s := newController(configController, xdsUpdater, options...)
if configController != nil {
configController.RegisterEventHandler(gvk.ServiceEntry, s.serviceEntryHandler)
configController.RegisterEventHandler(gvk.WorkloadEntry, s.workloadEntryHandler)
_ = configController.SetWatchErrorHandler(informermetric.ErrorHandlerForCluster(s.clusterID))
}
return s
}
因此我们可以得出结论,Consul Registry已经废弃,External Service Discovery合并到ServiceController之中,直接接管ServiceEntry和WorkLoadEntry的资源变化了。
对比源码后,当前的框架应该如下:
其他服务注册表的集成
依然是参考该文章,里面给出了三种不同的集成方式。
上图中分别用红、绿、三种颜色标识了这三种不同的集成方式。
该文章的处理建议是走红色路线,实现一个自定义的Adapter。不选择其他路线的原因:
- 绿色路线ApiServer的性能会是一个瓶颈。
- 蓝色路线因为galley已经废弃了,不建议启用。
我们在调研的过程中,测试过直接通过k8s的ApiServer操作,无论是对POD还是对ServiceEntry,性能都是一个问题。红色路线需要改动istio的源码,也没有采用。而蓝色路线,看了最新的代码,实际上nacos社区已经支持MCP Over XDS协议,反而是成为我们采用的方案。
实现方案分析
MCP方案代码分析
具体实现方式,在istio启动时,把外部nacos的地址作为configSources传入,schema协议头为xds://。
关键配置信息如下:
apiVersion: v1
data:
mesh: |-
accessLogFile: /dev/stdout
configSources:
- address: xds://192.168.x.xx:18848
- address: k8s://localhost/
configController初始化
关键代码分析如下
// initConfigController creates the config controller in the pilotConfig.
func (s *Server) initConfigController(args *PilotArgs) error {
s.initStatusController(args, features.EnableStatus)
meshConfig := s.environment.Mesh()
if len(meshConfig.ConfigSources) > 0 {
// Using MCP for config.
if err := s.initConfigSources(args); err != nil {
return err
}
}
//下文省略
如果有配置configSources,则以configSources为准。注意:为了istio能正常服务,k8s依然需要作为配置传入。
xds://处理
//上文省略
case XDS:
xdsMCP, err := adsc.New(srcAddress.Host, &adsc.Config{
Namespace: args.Namespace,
Workload: args.PodName,
Revision: args.Revision,
Meta: model.NodeMetadata{
Generator: "api",
// To reduce transported data if upstream server supports. Especially for custom servers.
IstioRevision: args.Revision,
}.ToStruct(),
InitialDiscoveryRequests: adsc.ConfigInitialRequests(),
})
if err != nil {
return fmt.Errorf("failed to dial XDS %s %v", configSource.Address, err)
}
store := memory.Make(collections.Pilot)
// TODO: enable namespace filter for memory controller
configController := memory.NewController(store)
configController.RegisterHasSyncedHandler(xdsMCP.HasSynced)
xdsMCP.Store = configController
err = xdsMCP.Run()
if err != nil {
return fmt.Errorf("MCP: failed running %v", err)
}
s.ConfigStores = append(s.ConfigStores, configController)
log.Warn("Started XDS config ", s.ConfigStores)
//下文省略
xdsMCP流程
总结一下,xdsMCP里面会建立一个grpc链接,拉取配置信息,外部服务作为ServiceEntry下发。
- 初始化
- 根据协议创建grpc连接后,client端发送一个init请求(InitialDiscoveryRequests),表示自己要监听哪些资源, 外部服务作为ServiceEntry
- 随后等待对端返回,并分发处理,即go a.handleRecv()
核心代码如下
func (a *ADSC) Run() error {
var err error
a.client = discovery.NewAggregatedDiscoveryServiceClient(a.conn)
a.stream, err = a.client.StreamAggregatedResources(context.Background())
if err != nil {
return err
}
a.sendNodeMeta = true
a.InitialLoad = 0
// Send the initial requests
for _, r := range a.cfg.InitialDiscoveryRequests {
if r.TypeUrl == v3.ClusterType {
a.watchTime = time.Now()
}
_ = a.Send(r)
}
// by default, we assume 1 goroutine decrements the waitgroup (go a.handleRecv()).
// for synchronizing when the goroutine finishes reading from the gRPC stream.
a.RecvWg.Add(1)
go a.handleRecv()
return nil
}
configController处理
- initConfigSources除了xdsMCP.Run()之外,还创建了一个configController,实际是一个内存副本。
//上文省略 store := memory.Make(collections.Pilot) // TODO: enable namespace filter for memory controller configController := memory.NewController(store) configController.RegisterHasSyncedHandler(xdsMCP.HasSynced) xdsMCP.Store = configController err = xdsMCP.Run() if err != nil { return fmt.Errorf("MCP: failed running %v", err) } s.ConfigStores = append(s.ConfigStores, configController) log.Warn("Started XDS config ", s.ConfigStores) //下文省略
- 这就意味着handleMCP都会触发更新内存中的configStore。RUN的时候除了更新store列表,还起了一个monitor在监听事件。
func (c *Controller) Run(stop <-chan struct{}) {
c.started.Store(true)
c.monitor.Run(stop)
}
//事件包括以下几种:
const (
// EventAdd is sent when an object is added
EventAdd Event = iota
// EventUpdate is sent when an object is modified
// Captures the modified object
EventUpdate
// EventDelete is sent when an object is deleted
// Captures the object at the last known state
EventDelete
)
结合前面所说的handleMCP对ConfigStore进行增删改之后,会触发monitor相关的事件,从而分发到各handler处理。
总结
MCP方案同步的部分总体流程如下。
后续这些配置变更,会经过istio下发到各个数据面,本文不再展开。验证方法可以查看istio控制面日志确认。