1.trino服务发现概述
trino coordinator内置了discovery server服务,用于接收集群中所有节点注册,coordinator和worker定时会将自己的节点信息通过http注册(push)到discovery server内存中,然后coordinator定时将discovery server中注册的节点信息pull到自己的内存,当query执行的时候,coordinator将任务分发到对应worker节点中去执行。
架构如下:
原生的架构中,discovery server是内嵌在coordinator中的,即embedded discovery。但是实际上,discovery server 是一个独立的服务,我们完全可以将discovery server解耦出来,基于此实现trino HA或node label等功能。
trino HA架构如下:
在这个架构中,coordinator和discovery server均为HA,主备coordinator都会定时从discovery server 中拉取节点信息,当coordinator发生切换时,由于备份coordinator中已经包含了所有的worker信息,client端只要和备份coordinator建立连接马上就能恢复查询,一般是毫秒级,而在原生架构中,由于coordinator重启后worker节点还需要重新注册,间隔可能会达到8s以上。HA架构在大规模trino集群及高并发场景下非常有用。
同样,两个discovery server服务也会从对方拉取注册的节点信息,由于discovery server主备节点中均已包含了集群所有节点,所以如果discovery server发生了切换,coordinator和client可以做到无感知。
2. trino服务发现源码详解
Trino服务发现交互包含以下四个步骤:
->2.1trino 节点定时将自己的信息通过restful接口push到discovery server中
->2.2discovery server接收节点信息并在自己内存中缓存
->2.3trino coordinator节点定时拉取discovery server中缓存的集群节点信息
->2.4discovery server将节点信息返回给trino coordinator
最后,若discovery server是HA部署,则主备discovery server会有一个信息同步机制(2.5)
2.1trino节点push流程
push流程相对比较简单,在trino server启动main方法中会调用Announcer类进行annouce
Server类main()方法
Announce类
在annouce方法中调用DiscoveryAnnouncementClient接口annouce方法向discovery server push
其中annoucementClient的具体实现是HttpDiscoveryAnnouncementClient,是在DiscoveryModule中注入的:
这里如果请求成功或者失败,都会启动线程池定时继续向discovery server push, 我们看一下定时调度的逻辑,其实就是递归调用annouce方法。
接下来看一下HttpDiscoveryAnnouncementClient类annouce方法的实现:
可以看到使用put方法请求discovery server,请求的body是annoucement,其中nodeInfo 在NodeModule中注入: configBinder(binder).bindConfig(NodeConfig.class);
discoveryServerUri是通过@ForDiscoveryClient注入的:
具体的实现在DiscoveryModule中:
请求的path:
另外我们看一下annoucement中到底是什么?
annoucement对象中包含了本节点要注册的信息,至此trino push逻辑结束。
2.2discovery server接收流程
入口是DynamicAnnouncementResource类
通过dynamicStore.put放到内存中,具体实现是ReplicatedDynamicStore,调用流程:
ReplicatedDynamicStore -> DistributedStore -> InMemoryStore
最终调用了InMemoryStore的put方法
最终放到了缓存中,缓存如何更新呢, 比较版本,那个版本更新,返回那个,然后和旧的值比较,如果不相等,则更新缓存
当然,如果trino节点宕机了,discovery server缓存中的节点信息也应该删除:
DistributedStore类:
最后在InMemoryStore中删除
2.3trino coordinator拉取流程
在trino DiscoveryNodeManager类中,定时从discovery server服务中拉取
调用链:
startPollingNodeStates -> pollWorkers -> refreshNodesInternal
serviceSelector具体实现是MergingServiceSelector
selector具体实现是CachingServiceSelector,
具体的逻辑还是异步定时刷新:
lookupClient实现是HttpDiscoveryLookupClient
最终调用HttpDiscoveryLookupClient 类lookup方法,向discovery server发送get请求获取所有节点信息。
另外在CachingServiceSelector refresh的时候,不管成功还是失败,都会用启用线程进行递归调用
最终在DiscoveryNodeManager类refreshNodesInternal方法中获取了所有节点信息,节点封装如下:
后续调度使用的节点均是从DiscoveryNodeManager中获取。
拉取流程结束。
2.4discovery server端返回流程
返回接口类是ServiceResource
staticStore为空,我们只看dynamicStore
调用逻辑是:
ReplicatedDynamicStore -> DistributedStore -> InMemoryStore
最终调用InMemoryStore类getAll方法,返回discovery server缓存中的所有节点信息
至此,整个服务发现流程就完成了。
接下来我们看一下discovery server HA的同步逻辑:
2.5.discovery server HA同步逻辑
每个discovery server 服务都在Replicator类中启动定时同步线程,将其他discovery server节点中的trino节点信息同步过来,默认是1分钟刷新一次,如果节点是本节点,则不做同步。
Replicator类 synchronize()方法逻辑:
selector的实现是DiscoveryServiceSelector,我们看一下selectAllServices()方法逻辑
其实就是从inventory配置文件中拿到相关信息,其中包含另外一台discovery server的uri,配置文件service-inventory.json如下:
继续看synchronize()方法:
请求成功之后,把节点信息保存在本地缓存。通过这种方式实现多个discovery server之间的数据同步。
3.总结
以上即为trino服务发现机制的详细流程,但是discovery server 的分离需要对trino coordinator做一定程度的改造。