func (this *ShardService) Dispatch() {
// 执行这个对应的获取target函数
targets := this.TargetGetFunc()
if len(targets) == 0 {
level.Warn(this.logger).Log("msg", "Dispatch.empty.targets")
return
}
// 先初始化一个map ,key是 节点,value是分配给这个节点的targets
nodeMap := make(map[string][]target.ScrapeTarget)
// 遍历target,
for _, t := range targets {
t := t
if len(t.Targets) != 1 {
continue
}
// 对target的地址 在哈希环中寻找节点
// 要求每个target的地址都是1个
// 然后根据node塞入map中
node := this.GetNode(t.Targets[0])
preTs, loaded := nodeMap[node]
if !loaded {
preTs = make([]target.ScrapeTarget, 0)
}
preTs = append(preTs, t)
nodeMap[node] = preTs
}
index := 1
allNum := len(nodeMap)
for node, ts := range nodeMap {
// 拼接一个json文件的名字
// 服务名_节点ip_索引_分片总数_target总数.json
jsonFileName := fmt.Sprintf("%s_%s_%d_%d_%d.json",
this.SrvName,
node,
index,
allNum,
len(ts),
)
// 写json文件
writeJsonFile(jsonFileName, ts)
extraVars := make(map[string]interface{})
extraVars["src_sd_file_name"] = jsonFileName
extraVars["dest_sd_file_name"] = this.DestSdFileName
extraVars["service_port"] = this.Port
level.Info(this.logger).Log(
"msg", "goansiblerun.run",
"this.SrvName", this.SrvName,
"jsonFileName", jsonFileName,
"node", node,
"index", index,
"all", allNum,
"targetNum", len(ts),
)
go goansiblerun.AnsiRunPlay(this.logger, this.SrvName, node, extraVars, this.YamlPath)
index++
}
}
// 遍历target,
for _, t := range targets {
t := t
if len(t.Targets) != 1 {
continue
}
node := this.GetNode(t.Targets[0])
preTs, loaded := nodeMap[node]
if !loaded {
preTs = make([]target.ScrapeTarget, 0)
}
preTs = append(preTs, t)
nodeMap[node] = preTs
}
func (this *ShardService) RunDispatch() error {
level.Info(this.logger).Log("msg", "RunDispatch.start", "name", this.SrvName)
ticker := time.NewTicker(1 * time.Minute)
this.Dispatch()
defer ticker.Stop()
for {
select {
case <-this.ctx.Done():
level.Info(this.logger).Log("msg", "receive_quit_signal_and_quit")
return nil
case <-ticker.C:
//level.Info(logger).Log("msg", "doIndexSync_run")
this.Dispatch()
}
}
}