searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Trino/presto服务发现(discovery server)机制详解

2023-05-26 08:21:15
667
0

1.trino服务发现概述

trino coordinator内置了discovery server服务,用于接收集群中所有节点注册,coordinator和worker定时会将自己的节点信息通过http注册(push)到discovery server内存中,然后coordinator定时将discovery server中注册的节点信息pull到自己的内存,当query执行的时候,coordinator将任务分发到对应worker节点中去执行。

架构如下:

原生的架构中,discovery server是内嵌在coordinator中的,即embedded discovery。但是实际上,discovery server 是一个独立的服务,我们完全可以将discovery server解耦出来,基于此实现trino HA或node label等功能。

trino HA架构如下:

在这个架构中,coordinator和discovery server均为HA,主备coordinator都会定时从discovery server 中拉取节点信息,当coordinator发生切换时,由于备份coordinator中已经包含了所有的worker信息,client端只要和备份coordinator建立连接马上就能恢复查询,一般是毫秒级,而在原生架构中,由于coordinator重启后worker节点还需要重新注册,间隔可能会达到8s以上。HA架构在大规模trino集群及高并发场景下非常有用。

同样,两个discovery server服务也会从对方拉取注册的节点信息,由于discovery server主备节点中均已包含了集群所有节点,所以如果discovery server发生了切换,coordinator和client可以做到无感知。

 

2. trino服务发现源码详解

Trino服务发现交互包含以下四个步骤:

->2.1trino 节点定时将自己的信息通过restful接口push到discovery server中

->2.2discovery server接收节点信息并在自己内存中缓存

->2.3trino coordinator节点定时拉取discovery server中缓存的集群节点信息

->2.4discovery server将节点信息返回给trino coordinator

最后,若discovery server是HA部署,则主备discovery server会有一个信息同步机制(2.5)

2.1trino节点push流程

push流程相对比较简单,在trino server启动main方法中会调用Announcer类进行annouce

Server类main()方法

Announce类

在annouce方法中调用DiscoveryAnnouncementClient接口annouce方法向discovery server push

其中annoucementClient的具体实现是HttpDiscoveryAnnouncementClient,是在DiscoveryModule中注入的:

这里如果请求成功或者失败,都会启动线程池定时继续向discovery server push, 我们看一下定时调度的逻辑,其实就是递归调用annouce方法。

接下来看一下HttpDiscoveryAnnouncementClient类annouce方法的实现:

可以看到使用put方法请求discovery server,请求的body是annoucement,其中nodeInfo 在NodeModule中注入: configBinder(binder).bindConfig(NodeConfig.class);

discoveryServerUri是通过@ForDiscoveryClient注入的:

具体的实现在DiscoveryModule中:

请求的path:

另外我们看一下annoucement中到底是什么?

annoucement对象中包含了本节点要注册的信息,至此trino push逻辑结束。

2.2discovery server接收流程

入口是DynamicAnnouncementResource类

通过dynamicStore.put放到内存中,具体实现是ReplicatedDynamicStore,调用流程:

ReplicatedDynamicStore -> DistributedStore -> InMemoryStore

最终调用了InMemoryStore的put方法

最终放到了缓存中,缓存如何更新呢, 比较版本,那个版本更新,返回那个,然后和旧的值比较,如果不相等,则更新缓存

当然,如果trino节点宕机了,discovery server缓存中的节点信息也应该删除:

DistributedStore类:

最后在InMemoryStore中删除

2.3trino coordinator拉取流程

在trino DiscoveryNodeManager类中,定时从discovery server服务中拉取

调用链:

startPollingNodeStates -> pollWorkers -> refreshNodesInternal

serviceSelector具体实现是MergingServiceSelector

selector具体实现是CachingServiceSelector,

具体的逻辑还是异步定时刷新:

lookupClient实现是HttpDiscoveryLookupClient

最终调用HttpDiscoveryLookupClient 类lookup方法,向discovery server发送get请求获取所有节点信息。

另外在CachingServiceSelector refresh的时候,不管成功还是失败,都会用启用线程进行递归调用

最终在DiscoveryNodeManager类refreshNodesInternal方法中获取了所有节点信息,节点封装如下:

后续调度使用的节点均是从DiscoveryNodeManager中获取。

拉取流程结束。

2.4discovery server端返回流程

返回接口类是ServiceResource

staticStore为空,我们只看dynamicStore

调用逻辑是:

ReplicatedDynamicStore -> DistributedStore -> InMemoryStore

最终调用InMemoryStore类getAll方法,返回discovery server缓存中的所有节点信息

至此,整个服务发现流程就完成了。

接下来我们看一下discovery server HA的同步逻辑:

2.5.discovery server HA同步逻辑

每个discovery server 服务都在Replicator类中启动定时同步线程,将其他discovery server节点中的trino节点信息同步过来,默认是1分钟刷新一次,如果节点是本节点,则不做同步。

Replicator类 synchronize()方法逻辑:

selector的实现是DiscoveryServiceSelector,我们看一下selectAllServices()方法逻辑

其实就是从inventory配置文件中拿到相关信息,其中包含另外一台discovery server的uri,配置文件service-inventory.json如下:

继续看synchronize()方法:

请求成功之后,把节点信息保存在本地缓存。通过这种方式实现多个discovery server之间的数据同步。

3.总结

以上即为trino服务发现机制的详细流程,但是discovery server 的分离需要对trino coordinator做一定程度的改造。

0条评论
0 / 1000
yx_knight
5文章数
0粉丝数
yx_knight
5 文章 | 0 粉丝
原创

Trino/presto服务发现(discovery server)机制详解

2023-05-26 08:21:15
667
0

1.trino服务发现概述

trino coordinator内置了discovery server服务,用于接收集群中所有节点注册,coordinator和worker定时会将自己的节点信息通过http注册(push)到discovery server内存中,然后coordinator定时将discovery server中注册的节点信息pull到自己的内存,当query执行的时候,coordinator将任务分发到对应worker节点中去执行。

架构如下:

原生的架构中,discovery server是内嵌在coordinator中的,即embedded discovery。但是实际上,discovery server 是一个独立的服务,我们完全可以将discovery server解耦出来,基于此实现trino HA或node label等功能。

trino HA架构如下:

在这个架构中,coordinator和discovery server均为HA,主备coordinator都会定时从discovery server 中拉取节点信息,当coordinator发生切换时,由于备份coordinator中已经包含了所有的worker信息,client端只要和备份coordinator建立连接马上就能恢复查询,一般是毫秒级,而在原生架构中,由于coordinator重启后worker节点还需要重新注册,间隔可能会达到8s以上。HA架构在大规模trino集群及高并发场景下非常有用。

同样,两个discovery server服务也会从对方拉取注册的节点信息,由于discovery server主备节点中均已包含了集群所有节点,所以如果discovery server发生了切换,coordinator和client可以做到无感知。

 

2. trino服务发现源码详解

Trino服务发现交互包含以下四个步骤:

->2.1trino 节点定时将自己的信息通过restful接口push到discovery server中

->2.2discovery server接收节点信息并在自己内存中缓存

->2.3trino coordinator节点定时拉取discovery server中缓存的集群节点信息

->2.4discovery server将节点信息返回给trino coordinator

最后,若discovery server是HA部署,则主备discovery server会有一个信息同步机制(2.5)

2.1trino节点push流程

push流程相对比较简单,在trino server启动main方法中会调用Announcer类进行annouce

Server类main()方法

Announce类

在annouce方法中调用DiscoveryAnnouncementClient接口annouce方法向discovery server push

其中annoucementClient的具体实现是HttpDiscoveryAnnouncementClient,是在DiscoveryModule中注入的:

这里如果请求成功或者失败,都会启动线程池定时继续向discovery server push, 我们看一下定时调度的逻辑,其实就是递归调用annouce方法。

接下来看一下HttpDiscoveryAnnouncementClient类annouce方法的实现:

可以看到使用put方法请求discovery server,请求的body是annoucement,其中nodeInfo 在NodeModule中注入: configBinder(binder).bindConfig(NodeConfig.class);

discoveryServerUri是通过@ForDiscoveryClient注入的:

具体的实现在DiscoveryModule中:

请求的path:

另外我们看一下annoucement中到底是什么?

annoucement对象中包含了本节点要注册的信息,至此trino push逻辑结束。

2.2discovery server接收流程

入口是DynamicAnnouncementResource类

通过dynamicStore.put放到内存中,具体实现是ReplicatedDynamicStore,调用流程:

ReplicatedDynamicStore -> DistributedStore -> InMemoryStore

最终调用了InMemoryStore的put方法

最终放到了缓存中,缓存如何更新呢, 比较版本,那个版本更新,返回那个,然后和旧的值比较,如果不相等,则更新缓存

当然,如果trino节点宕机了,discovery server缓存中的节点信息也应该删除:

DistributedStore类:

最后在InMemoryStore中删除

2.3trino coordinator拉取流程

在trino DiscoveryNodeManager类中,定时从discovery server服务中拉取

调用链:

startPollingNodeStates -> pollWorkers -> refreshNodesInternal

serviceSelector具体实现是MergingServiceSelector

selector具体实现是CachingServiceSelector,

具体的逻辑还是异步定时刷新:

lookupClient实现是HttpDiscoveryLookupClient

最终调用HttpDiscoveryLookupClient 类lookup方法,向discovery server发送get请求获取所有节点信息。

另外在CachingServiceSelector refresh的时候,不管成功还是失败,都会用启用线程进行递归调用

最终在DiscoveryNodeManager类refreshNodesInternal方法中获取了所有节点信息,节点封装如下:

后续调度使用的节点均是从DiscoveryNodeManager中获取。

拉取流程结束。

2.4discovery server端返回流程

返回接口类是ServiceResource

staticStore为空,我们只看dynamicStore

调用逻辑是:

ReplicatedDynamicStore -> DistributedStore -> InMemoryStore

最终调用InMemoryStore类getAll方法,返回discovery server缓存中的所有节点信息

至此,整个服务发现流程就完成了。

接下来我们看一下discovery server HA的同步逻辑:

2.5.discovery server HA同步逻辑

每个discovery server 服务都在Replicator类中启动定时同步线程,将其他discovery server节点中的trino节点信息同步过来,默认是1分钟刷新一次,如果节点是本节点,则不做同步。

Replicator类 synchronize()方法逻辑:

selector的实现是DiscoveryServiceSelector,我们看一下selectAllServices()方法逻辑

其实就是从inventory配置文件中拿到相关信息,其中包含另外一台discovery server的uri,配置文件service-inventory.json如下:

继续看synchronize()方法:

请求成功之后,把节点信息保存在本地缓存。通过这种方式实现多个discovery server之间的数据同步。

3.总结

以上即为trino服务发现机制的详细流程,但是discovery server 的分离需要对trino coordinator做一定程度的改造。

文章来自个人专栏
trino/presto
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
1
0