searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Distro协议解析

2023-05-31 02:15:50
53
0

一、背景简介

天翼云微服务引擎-注册配置中心最近已经开放公测,目前已经提供了Nacos引擎产品功能供广大用户免费使用。Nacos作为一个分布式协同组件,提供了众多功能特性,其中核心功能微服务注册发现,对可用性的要求非常高,因为注册中心作为系统中很重要的的一个服务,需要尽最大可能对外提供可用的服务,所以选择 AP 来保证服务的高可用,另外 Nacos 还采取了心跳机制来自动完成服务数据补偿的机制,进一步提升可用性。而Nacos的AP主要是通过一套Disto协议来实现的。

 

二、Distro协议介绍

Distro 协议是 Nacos 对于临时实例数据开发的⼀致性协议,其数据存储在缓存中,并且会在启动 时进行全量数据同步,并定期进行数据校验。Distro 协议保证了在分布式环境下每个节点上面的服务信息的状态都能够及时地通知其他节点,其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作,可以维持数十万量级服务实例的存储和⼀致性。

 

三、Distro协议的设计机制

平等机制

Nacos 的每个节点是平等的,都可以处理写的请求。 

异步复制机制

Nacos 把变更的数据异步复制到其他节点。 

健康检查机制

每个节点只存了部分数据,定期检查客户端状态保持数据一致性。 

本地读机制

每个节点独立处理读请求,及时从本地发出响应。

新节点同步机制

Nacos 启动时,从其他节点同步数据。

路由转发机制

客户端发送的写请求,如果属于自己则处理,否则路由转发给其他节点。

 

四、Distro协议过程解析

4.1 Disto协议初始化

Nacos集群节点Disto协议模块初始化时,判断若非standalone模式启动,则新建两种任务:数据加载任务和数据校验任务;

4.1.1 LoadTask数据加载任务

节点启动时执行一次该任务,设置状态作为后续任务的参照;该任务向集群其他节点发起全量数据拉取,维护一个本机节点的一个全局状态isInitialized,加载数据成功之后,将这个状态设置为true,表示当前节点已经同步完毕数据,可以参与正常服务;同时设置了数据缓存DistroDataStorage的isFinishInitial属性为true,表示数据已准备好,可以开始定时数据验证任务;

任务执行过程如下:

1,检测集群其他节点,若不存在在线节点,则死循环等待,间隔1秒检测;

2,等待数据缓存DistroDataStorage注册,若未注册,则死循环等待,间隔1秒检测;

3,检测数据缓存DistroDataStorage是否加载过,没加载过则启动数据加载流程:轮询集群中其他在线节点,获取Snapshot数据

,解析后存放本地缓存中;

同步Snapshot数据的数据结构如下所示:

clientSyncData = {ClientSyncData@xxxxx} 
	clientId = "ip1:8848#true"
	attributes = {ClientSyncAttributes@xxxx} 
	namespaces = {ArrayList@xxxxx}  size = 2
		0 = "public"
		1 = "public"
	groupNames = {ArrayList@xxxxx}  size = 2
		0 = "DEFAULT_GROUP"
		1 = "DEFAULT_GROUP"
	serviceNames = {ArrayList@xxxxx}  size = 2
		0 = "SERVICE_01"
		1 = "SERVICE_02"
	instancePublishInfos = {ArrayList@xxxxx}  size = 2
		0 = {InstancePublishInfo@xxxxx} "InstancePublishInfo{ip=ip1, port=8848, healthy=false}"
		1 = {InstancePublishInfo@xxxxx} "InstancePublishInfo{ip=ip1, port=8848, healthy=false}"

通过数据加载任务,可以在节点新加入集群时,从其他节点快速获取最新的全量的临时服务数据,进入服务状态。

4.1.2 VerifyTask数据校验任务

节点数据验证任务,节点启动延迟5秒后,每5s周期性执行一次;

任务执行流程:

1,获取除自身节点之外的其他节点;

2,获取每种数据类型(Nacos v2.x版本主要为Nacos:Naming:v2:ClientData)的DistroData数据;

3,序列化DistroData(其核心是本节点负责的服务实例的clientId),根据数据类型获取对应的Distro客户端agent,使用grpc方式异步地发送verify请求,对方节点收到请求后对比后更新本地缓存中的服务信息;

通过定时更新任务,服务实例负责节点通知其他也拥有此服务实例信息的节点刷新相关服务实例的最新活跃时间。否则校验失败,负责节点在收到失败的Response后会发布ClientVerifyFailedEvent事件,触发数据同步操作。

 

4.2 客户端读写请求

客户端请求根据处理方式的不通,可以分为读操作和写操作。每种操作都会经过过滤器进行处理,DistroFilter负责过滤客户端读写请求,对于写数据等标记为canDisto(代码控制)的请求,需要转发请求到负责对应数据的节点上去处理;如下代码片段所示

        try {
            Method method = controllerMethodsCache.getMethod(req);
            
            String path = new URI(req.getRequestURI()).getPath();
            if (method == null) {
                throw new NoSuchMethodException(req.getMethod() + " " + path);
            }
            
            if (!method.isAnnotationPresent(CanDistro.class)) {
                filterChain.doFilter(req, resp);
                return;
            }
            String distroTag = distroTagGenerator.getResponsibleTag(req);
            
            if (distroMapper.responsible(distroTag)) {
                filterChain.doFilter(req, resp);
                return;
            }
            
            // proxy request to other server if necessary:
            String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);
            
            if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
                // This request is sent from peer server, should not be redirected again:
                Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());
                resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
                        "receive invalid redirect request from peer " + req.getRemoteAddr());
                return;
            }
            
            final String targetServer = distroMapper.mapSrv(distroTag);
            
            List<String> headerList = new ArrayList<>(16);
            Enumeration<String> headers = req.getHeaderNames();
            while (headers.hasMoreElements()) {
                String headerName = headers.nextElement();
                headerList.add(headerName);
                headerList.add(req.getHeader(headerName));
            }
            
            final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
            final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());
            
            RestResult<String> result = HttpClient
                    .request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
                            PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
            String data = result.ok() ? result.getData() : result.getMessage();
            try {
                WebUtils.response(resp, data, result.getCode());
            } catch (Exception ignore) {
                Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
            }
        } catch (AccessControlException e) {
            resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
        } catch (NoSuchMethodException e) {
            resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
                    "no such api:" + req.getMethod() + ":" + req.getRequestURI());
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("[DISTRO-FILTER] Server failed: ", e);
            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                    "Server failed, " + ExceptionUtil.getAllExceptionMsg(e));
        }

 

4.2.1 读操作

由于集群每个节点上都存放了全量数据,因此在每⼀次读操作中,被请求节点会直接从本地拉取数据。 快速响应,这样保证了 Distro 协议可以作为⼀种 AP 协议,对于读操作都进行及时的响应。在网络分区 的情况下,对于所有的读操作也能够正常返回;当网络恢复时,各个 Distro 节点会把各数据分片的 数据进行合并恢复。

 

如前诉DistroFIlter代码所示,当读操作方法不被CanDistro注解时,会直接执行后续逻辑,即读取本地缓存返回客户端;

 

4.2.2 写操作

写操作比如服务注册方法上由于添加了CanDistro注解如下图所示,过滤器层面会进一步计算该请求的责任节点,并将请求转发到对应的责任节点上进行处理。

    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId,
            @RequestParam String serviceName, @RequestParam String ip,
            @RequestParam(defaultValue = UtilsAndCommons.DEFAULT_CLUSTER_NAME) String cluster,
            @RequestParam Integer port, @RequestParam(defaultValue = "true") Boolean healthy,
            @RequestParam(defaultValue = "1") Double weight, @RequestParam(defaultValue = "true") Boolean enabled,
            @RequestParam String metadata, @RequestParam Boolean ephemeral, HttpServletRequest request) throws Exception {
        
        NamingUtils.checkServiceNameFormat(serviceName);
        checkWeight(weight);
        final Instance instance = InstanceBuilder.newBuilder().setServiceName(serviceName).setIp(ip)
                .setClusterName(cluster).setPort(port).setHealthy(healthy).setWeight(weight).setEnabled(enabled)
                .setMetadata(UtilsAndCommons.parseMetadata(metadata)).setEphemeral(ephemeral).build();
        if (ephemeral == null) {
            instance.setEphemeral((switchDomain.isDefaultInstanceEphemeral()));
        }
        instanceServiceV2.registerInstance(namespaceId, serviceName, instance);
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), request.getRemoteHost(),
                false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
                instance.getIp(), instance.getPort()));
        return "ok";
    }

那么如何确定该请求的负责节点?通过标签进行路由,具体步骤如下:

步骤1:根据策略获取distroTag标签

Nacos内置两种策略

a) DistroServiceNameTagGenerator 服务名tag策略,根据注册上来的服务名生产distoTag,${groupName}@@${serviceName};
b) DistroIpPortTagGenerator ip端口策略,根据注册上来的服务的ip端口来生产distoTag,${ip]:${port};


当集群所有节点的Nacos版本都是2.0.0以上时,使用ip端口策略,否则使用服务名策略

步骤2:根据distroTag获取责任节点

如果是新增节点,即集群列表中没有改节点ip,则当前节点为责任节点;

如果不是新增节点,通过hash(distoTag)% $clusterSize计算index,获取集群节点列表中的对应index的节点作为责任节点;

如果获取失败,则转发到当前节点;

 

4.3 数据增量同步

前文讲述了节点初始化时会从集群其他节点进行全量同步,有且仅有一次。那么运行时新增的数据就是通过数据增量同步来实现共享的。

增量同步流程:

1,在Distro节点启动时通过事件驱动模式订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent等事件;

2,触发事件后,节点通过向集群其他所有节点或者事件指定节点进行指定数据的同步操作;

 

5、总结

通过以上数据全量同步,定时校验,读写控制,以及基于事件的数据增量同步,Distro协议可以保证高可用性和最终一致性。

 

0条评论
0 / 1000
廖****锋
11文章数
0粉丝数
廖****锋
11 文章 | 0 粉丝
原创

Distro协议解析

2023-05-31 02:15:50
53
0

一、背景简介

天翼云微服务引擎-注册配置中心最近已经开放公测,目前已经提供了Nacos引擎产品功能供广大用户免费使用。Nacos作为一个分布式协同组件,提供了众多功能特性,其中核心功能微服务注册发现,对可用性的要求非常高,因为注册中心作为系统中很重要的的一个服务,需要尽最大可能对外提供可用的服务,所以选择 AP 来保证服务的高可用,另外 Nacos 还采取了心跳机制来自动完成服务数据补偿的机制,进一步提升可用性。而Nacos的AP主要是通过一套Disto协议来实现的。

 

二、Distro协议介绍

Distro 协议是 Nacos 对于临时实例数据开发的⼀致性协议,其数据存储在缓存中,并且会在启动 时进行全量数据同步,并定期进行数据校验。Distro 协议保证了在分布式环境下每个节点上面的服务信息的状态都能够及时地通知其他节点,其保证了在某些 Nacos 节点宕机后,整个临时实例处理系统依旧可以正常工作,可以维持数十万量级服务实例的存储和⼀致性。

 

三、Distro协议的设计机制

平等机制

Nacos 的每个节点是平等的,都可以处理写的请求。 

异步复制机制

Nacos 把变更的数据异步复制到其他节点。 

健康检查机制

每个节点只存了部分数据,定期检查客户端状态保持数据一致性。 

本地读机制

每个节点独立处理读请求,及时从本地发出响应。

新节点同步机制

Nacos 启动时,从其他节点同步数据。

路由转发机制

客户端发送的写请求,如果属于自己则处理,否则路由转发给其他节点。

 

四、Distro协议过程解析

4.1 Disto协议初始化

Nacos集群节点Disto协议模块初始化时,判断若非standalone模式启动,则新建两种任务:数据加载任务和数据校验任务;

4.1.1 LoadTask数据加载任务

节点启动时执行一次该任务,设置状态作为后续任务的参照;该任务向集群其他节点发起全量数据拉取,维护一个本机节点的一个全局状态isInitialized,加载数据成功之后,将这个状态设置为true,表示当前节点已经同步完毕数据,可以参与正常服务;同时设置了数据缓存DistroDataStorage的isFinishInitial属性为true,表示数据已准备好,可以开始定时数据验证任务;

任务执行过程如下:

1,检测集群其他节点,若不存在在线节点,则死循环等待,间隔1秒检测;

2,等待数据缓存DistroDataStorage注册,若未注册,则死循环等待,间隔1秒检测;

3,检测数据缓存DistroDataStorage是否加载过,没加载过则启动数据加载流程:轮询集群中其他在线节点,获取Snapshot数据

,解析后存放本地缓存中;

同步Snapshot数据的数据结构如下所示:

clientSyncData = {ClientSyncData@xxxxx} 
	clientId = "ip1:8848#true"
	attributes = {ClientSyncAttributes@xxxx} 
	namespaces = {ArrayList@xxxxx}  size = 2
		0 = "public"
		1 = "public"
	groupNames = {ArrayList@xxxxx}  size = 2
		0 = "DEFAULT_GROUP"
		1 = "DEFAULT_GROUP"
	serviceNames = {ArrayList@xxxxx}  size = 2
		0 = "SERVICE_01"
		1 = "SERVICE_02"
	instancePublishInfos = {ArrayList@xxxxx}  size = 2
		0 = {InstancePublishInfo@xxxxx} "InstancePublishInfo{ip=ip1, port=8848, healthy=false}"
		1 = {InstancePublishInfo@xxxxx} "InstancePublishInfo{ip=ip1, port=8848, healthy=false}"

通过数据加载任务,可以在节点新加入集群时,从其他节点快速获取最新的全量的临时服务数据,进入服务状态。

4.1.2 VerifyTask数据校验任务

节点数据验证任务,节点启动延迟5秒后,每5s周期性执行一次;

任务执行流程:

1,获取除自身节点之外的其他节点;

2,获取每种数据类型(Nacos v2.x版本主要为Nacos:Naming:v2:ClientData)的DistroData数据;

3,序列化DistroData(其核心是本节点负责的服务实例的clientId),根据数据类型获取对应的Distro客户端agent,使用grpc方式异步地发送verify请求,对方节点收到请求后对比后更新本地缓存中的服务信息;

通过定时更新任务,服务实例负责节点通知其他也拥有此服务实例信息的节点刷新相关服务实例的最新活跃时间。否则校验失败,负责节点在收到失败的Response后会发布ClientVerifyFailedEvent事件,触发数据同步操作。

 

4.2 客户端读写请求

客户端请求根据处理方式的不通,可以分为读操作和写操作。每种操作都会经过过滤器进行处理,DistroFilter负责过滤客户端读写请求,对于写数据等标记为canDisto(代码控制)的请求,需要转发请求到负责对应数据的节点上去处理;如下代码片段所示

        try {
            Method method = controllerMethodsCache.getMethod(req);
            
            String path = new URI(req.getRequestURI()).getPath();
            if (method == null) {
                throw new NoSuchMethodException(req.getMethod() + " " + path);
            }
            
            if (!method.isAnnotationPresent(CanDistro.class)) {
                filterChain.doFilter(req, resp);
                return;
            }
            String distroTag = distroTagGenerator.getResponsibleTag(req);
            
            if (distroMapper.responsible(distroTag)) {
                filterChain.doFilter(req, resp);
                return;
            }
            
            // proxy request to other server if necessary:
            String userAgent = req.getHeader(HttpHeaderConsts.USER_AGENT_HEADER);
            
            if (StringUtils.isNotBlank(userAgent) && userAgent.contains(UtilsAndCommons.NACOS_SERVER_HEADER)) {
                // This request is sent from peer server, should not be redirected again:
                Loggers.SRV_LOG.error("receive invalid redirect request from peer {}", req.getRemoteAddr());
                resp.sendError(HttpServletResponse.SC_BAD_REQUEST,
                        "receive invalid redirect request from peer " + req.getRemoteAddr());
                return;
            }
            
            final String targetServer = distroMapper.mapSrv(distroTag);
            
            List<String> headerList = new ArrayList<>(16);
            Enumeration<String> headers = req.getHeaderNames();
            while (headers.hasMoreElements()) {
                String headerName = headers.nextElement();
                headerList.add(headerName);
                headerList.add(req.getHeader(headerName));
            }
            
            final String body = IoUtils.toString(req.getInputStream(), Charsets.UTF_8.name());
            final Map<String, String> paramsValue = HttpClient.translateParameterMap(req.getParameterMap());
            
            RestResult<String> result = HttpClient
                    .request(HTTP_PREFIX + targetServer + req.getRequestURI(), headerList, paramsValue, body,
                            PROXY_CONNECT_TIMEOUT, PROXY_READ_TIMEOUT, Charsets.UTF_8.name(), req.getMethod());
            String data = result.ok() ? result.getData() : result.getMessage();
            try {
                WebUtils.response(resp, data, result.getCode());
            } catch (Exception ignore) {
                Loggers.SRV_LOG.warn("[DISTRO-FILTER] request failed: " + distroMapper.mapSrv(distroTag) + urlString);
            }
        } catch (AccessControlException e) {
            resp.sendError(HttpServletResponse.SC_FORBIDDEN, "access denied: " + ExceptionUtil.getAllExceptionMsg(e));
        } catch (NoSuchMethodException e) {
            resp.sendError(HttpServletResponse.SC_NOT_IMPLEMENTED,
                    "no such api:" + req.getMethod() + ":" + req.getRequestURI());
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("[DISTRO-FILTER] Server failed: ", e);
            resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
                    "Server failed, " + ExceptionUtil.getAllExceptionMsg(e));
        }

 

4.2.1 读操作

由于集群每个节点上都存放了全量数据,因此在每⼀次读操作中,被请求节点会直接从本地拉取数据。 快速响应,这样保证了 Distro 协议可以作为⼀种 AP 协议,对于读操作都进行及时的响应。在网络分区 的情况下,对于所有的读操作也能够正常返回;当网络恢复时,各个 Distro 节点会把各数据分片的 数据进行合并恢复。

 

如前诉DistroFIlter代码所示,当读操作方法不被CanDistro注解时,会直接执行后续逻辑,即读取本地缓存返回客户端;

 

4.2.2 写操作

写操作比如服务注册方法上由于添加了CanDistro注解如下图所示,过滤器层面会进一步计算该请求的责任节点,并将请求转发到对应的责任节点上进行处理。

    @CanDistro
    @PostMapping
    @Secured(action = ActionTypes.WRITE)
    public String register(@RequestParam(defaultValue = Constants.DEFAULT_NAMESPACE_ID) String namespaceId,
            @RequestParam String serviceName, @RequestParam String ip,
            @RequestParam(defaultValue = UtilsAndCommons.DEFAULT_CLUSTER_NAME) String cluster,
            @RequestParam Integer port, @RequestParam(defaultValue = "true") Boolean healthy,
            @RequestParam(defaultValue = "1") Double weight, @RequestParam(defaultValue = "true") Boolean enabled,
            @RequestParam String metadata, @RequestParam Boolean ephemeral, HttpServletRequest request) throws Exception {
        
        NamingUtils.checkServiceNameFormat(serviceName);
        checkWeight(weight);
        final Instance instance = InstanceBuilder.newBuilder().setServiceName(serviceName).setIp(ip)
                .setClusterName(cluster).setPort(port).setHealthy(healthy).setWeight(weight).setEnabled(enabled)
                .setMetadata(UtilsAndCommons.parseMetadata(metadata)).setEphemeral(ephemeral).build();
        if (ephemeral == null) {
            instance.setEphemeral((switchDomain.isDefaultInstanceEphemeral()));
        }
        instanceServiceV2.registerInstance(namespaceId, serviceName, instance);
        NotifyCenter.publishEvent(new RegisterInstanceTraceEvent(System.currentTimeMillis(), request.getRemoteHost(),
                false, namespaceId, NamingUtils.getGroupName(serviceName), NamingUtils.getServiceName(serviceName),
                instance.getIp(), instance.getPort()));
        return "ok";
    }

那么如何确定该请求的负责节点?通过标签进行路由,具体步骤如下:

步骤1:根据策略获取distroTag标签

Nacos内置两种策略

a) DistroServiceNameTagGenerator 服务名tag策略,根据注册上来的服务名生产distoTag,${groupName}@@${serviceName};
b) DistroIpPortTagGenerator ip端口策略,根据注册上来的服务的ip端口来生产distoTag,${ip]:${port};


当集群所有节点的Nacos版本都是2.0.0以上时,使用ip端口策略,否则使用服务名策略

步骤2:根据distroTag获取责任节点

如果是新增节点,即集群列表中没有改节点ip,则当前节点为责任节点;

如果不是新增节点,通过hash(distoTag)% $clusterSize计算index,获取集群节点列表中的对应index的节点作为责任节点;

如果获取失败,则转发到当前节点;

 

4.3 数据增量同步

前文讲述了节点初始化时会从集群其他节点进行全量同步,有且仅有一次。那么运行时新增的数据就是通过数据增量同步来实现共享的。

增量同步流程:

1,在Distro节点启动时通过事件驱动模式订阅了ClientChangedEvent、ClientDisconnectEvent和ClientVerifyFailedEvent等事件;

2,触发事件后,节点通过向集群其他所有节点或者事件指定节点进行指定数据的同步操作;

 

5、总结

通过以上数据全量同步,定时校验,读写控制,以及基于事件的数据增量同步,Distro协议可以保证高可用性和最终一致性。

 

文章来自个人专栏
微服务&中间件
11 文章 | 2 订阅
0条评论
0 / 1000
请输入你的评论
0
0