背景
目前,由于yacos项目架构设计中存在大量的异步任务运行(如下图),当我们对yacos服务(commander)进行重启时,可能导致异步任务执行中断,就会导致出现数据不一致等等众多问题。
方案思考
方案一
为了解决这个问题,咱们第一反应一定是,“那在服务重启之后,让中断任务重新执行不就好了”,但是这里有个问题,咱们的异步任务大多都无法做到严密的幂等性,例如:创建任务,当我们调底层进行创建后,我们的任务在进行底层创建结果轮询时被中断了,此时任务重启后,调用底层再次创建,可能会出现各种各样的结果(成功 or 失败,可能视参数而定)。当然我们如果能解决这个问题,底层增加严密的幂等校验,当我们调用底层创建幂等后,我们继续往下轮询状态,则也可正常完结该任务。。然而,这只是其中一个异步任务,目前yacos项目中涉及到的类似任务众多,且有众多产品与产线,故要通过解决幂等校验从而解决我们的服务启动带来的问题似乎遥遥无期。
方案二
如果重新执行不可行,那么是否可以做到继续执行呢?继续执行意味着对于任务执行中间状态的记录,咱们可以对咱们的流程进行细化拆分,记录执行节点,但是这里同样存在一个致命问题,服务重启可能发生在任务执行的任何阶段,当我们某个卡点执行完成,正要记录执行状态时,服务重启了,于是我们也就丢失了当前卡点执行状态,如果该卡点无法重新执行,而我们未感知到其已执行,则会带来和方案一相同的问题,且其涉及的改造成本也很高,故也不现实。
方案三
当我们无法通过解决异步任务中断带来的问题时,我们似乎也可以换个思路思考,如果我们无法解决重启带来的异步任务中断问题,那我们是否可以阻止异步任务被异常中断的发生呢?
重启可以等异步执行完成,再进行服务重启,也即:服务优雅启动,是否就可以解决该问题了呢?!!!
方案分析
服务优雅启动涉及到两个方面:
- 保证正在执行的异步任务执行完成;
- 不要再执行新的任务。
所以以下,我们从这两个方面下手思考我们的代码实现思路。
1、保证正在执行的异步任务执行完成。
“当存在异步任务正在执行中,则等待其执行完成”。所以,这又带来了两个新的问题:如何知道程序中有哪些异步任务正在执行?当我们知道有异步任务正在执行时,我们该如何做?
- 如何知道程序中有哪些异步任务正在执行?
对于分布式异步任务,我们可能有多个commander对同一个队列任务进行消费,那么我们能很快想到,将正在运行的任务保存到redis缓存中,执行完成则移除,我们则可以随时从缓存中得知正在运行的异步任务情况。
- 当我们知道有异步任务正在执行时,我们该如何做?
当我们获取到异步任务时,我们需要确保该任务顺利执行完成,其任务执行优先级高于重启优先级,故,这里,我们可以通过一个启动脚本来进行异步任务轮询+等待+轮询+等待+...+重启。
2、不要再执行新的任务。
这里不想再执行新的任务时,则:我们当前运行的commander需要获得停止命令,这里我们同样可以通过缓存来实现,在容器重启之后,先发起停止命令(停止指令存入缓存),后续commander读取到该指令,则不再读取新的任务。
架构设计
从上面的分析,我们可以得知,当前的服务优雅启动实现可以分为三块:
1、任务执行缓存记录;
2、停止命令下发;
3、重启脚本编写;
新的执行链路如下:
1、配置容器标识
由于重启均以docker为单位重启,故,可以给每个docker设置唯一标识,任务缓存以该flag进行存储。
而我们每个docker都唯一对应一份base_conf, 故可以在其添加标识。
2、启动tools服务
服务:
获取当前正在运行得任务: curl 'http://localhost:port/api/tools/?action=get_running_task_detail&container_flag=test'
获取当前运行任务得数量:curl 'http://localhost:port/api/tools/?action=get_running_task_size&container_flag=test'
停止运行新任务: curl 'http://localhost:port/api/tools/?action=stop_task&container_flag=test'
3、脚本配置
import os
import sys
import time
if len(sys.argv) > 1:
container_flag = sys.argv[2]
docker_name = sys.argv[1]
server_ip = "10.150.66.74"
port_id = "12081"
get_running_task_detail = f"curl 'http://{server_ip}:{port_id}/api/tools/?action=get_running_task_detail&container_flag={container_flag}'"
get_running_task_size = f"curl 'http://{server_ip}:{port_id}/api/tools/?action=get_running_task_size&container_flag={container_flag}'"
stop_task = f"curl 'http://{server_ip}:{port_id}/api/tools/?action=stop_task&container_flag={container_flag}'"
# 发送停止命令
print("**** start end stop order!")
os.system(stop_task)
print("")
print("**** start check running tasks!")
while True:
# 1、获取当前运行任务得数量
running_size = int(os.popen(get_running_task_size).read())
if running_size == 0:
print("**** no tasks running")
break
# 2、获取当前运行任务明细
os.system(get_running_task_detail)
print("**** some tasks are runnning, please wait!!")
print("**** waiting 3s")
time.sleep(3)
# 3、 重启容器
print(f"**** start restart docker : {docker_name}")
os.system(f"docker restart {docker_name}")
else:
print("params can not be empty, first is docker_name, second is container_flag.")
4、启动脚本
python3 脚本.py 重启名称 容器标识