searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

基于Redis队列实现的任务调度引擎的算法分享

2024-06-25 09:47:22
31
0

传统的流水线引擎方案,主要是基于Mysql定时任务表或开源组件如Jenkins流水线引擎来研发。在流量稍高或io消耗稍大时容易出现卡顿、运行慢问题;且由于定时任务和Jenkins都是有状态服务,无法实现节点横向扩容;最后jenkins引擎需要有专业技术团队来部署和维护,也提高了维护行的难度。这里通过基于Redis队列实现的流水线任务调度引擎,能毫秒级实现任务间的状态流转,具备较高的调度性能。

基于Redis队列实现的任务调度引擎,其中调度核心之一由4个算法组成,即:任务入队Push算法、任务出队Poll算法、任务执行算法和任务失败重试算法。

一、任务入队Push算法

任务入队Push算法主要逻辑是基于Redis有序队列来来按执行存储能代表任务信息唯一性的Hash值,目的是保证流水线执行任务能按照编排顺序串行或并行。

实现流程图如下:


1、对任务信息msg进行hash算法得到任务的hash值,再调用Zrank函数判断该hash值是否存在。如果不存在,则按步骤2执行;如果存在,那么执行步骤3。
2、给msg初始化待执行数据,开启事务,先将任务信息msg存储到hash表中,再根据系统时间和任务信息msg里的延迟时间调用得分算法,计算出当前任务对应的分数值,最后调用Zadd函数将当前任务hash值加入到任务队列中。
3、取msg里的延迟时间,调用得分算法,计算出任务最新的分数,然后调用Zadd函数更新该任务hash值的执行顺序。

二、任务出队Poll算法

任务出队Poll算法,由服务内定时器按指定频率调用,默认是50ms间隔时间。主要逻辑是调用ZrangeByScore函数获取一条任务hash值,然后发送执行这个任务的事件。

1、检查当前线程池是否有空余线程可调度,有则继续,无则返回。
2、调用ZrangeByScore函数,获取任务队列中得分最小的任务hash值。
3、调用SetNx函数获取分布式锁,通过任务hash值调用Hget函数获取任务信息msg。
4、调用Zrem函数将任务hash值从任务队列中移除,即表示任务出队。
5、调用Hset函数将修改执行次数的任务信息msg更新到hash表。
6、将任务信息msg包装成任务事件event发送,即让任务执行器监听事件,并调用后面的任务执行算法执行。

三、任务执行算法

任务执行算法,为了保持扩展性,内置了流水线内任务事件接近有22种状态,诸如开始StartPipeline、CancelStage等。此算法核心难点是较多状态机和策略模式设计。

1、执行器监听到任务出队算法的任务事件event后,会利用java反射原理得到对应event的事件处理器handler。
2、再通过线程池异步执行handler.invoke(event)方法,即执行业务的真正逻辑。handler内部除了自身业务逻辑需要用户自己定义外,其他复杂的状态递归转化为内置逻辑,可极大提高devops平台研发人员生产力。研发人员只需关心每种事件event需要做的业务逻辑,而不用考虑复杂的状态转换。

四、任务失败重试算法

任务失败重试算法,为了提高容错性,诸如网络抖动造成的任务失败等异常,此算法解决的是这种可通过自动重试来解决的异常事件,进而提高任务执行成功率。

1、有个前提:失败任务会有失败任务队列来存储任务hash值。首先,调用Zrange函数从失败任务队列中取出任务hash值集合。
2、遍历任务hash值集合,对每个任务hash,调用Hget函数获取任务信息msg。
3、将任务信息msg中执行次数和最大可执行次数对比,如果超过最大可执行次数,则调用Zrem函数从失败任务队列中移除任务hash值;如果不超过,则开启事务,给失败任务调用score算法打分,再调用Zadd函数将该任务hash值加入到正常的任务队列中。

0条评论
作者已关闭评论
路西
5文章数
0粉丝数
路西
5 文章 | 0 粉丝
原创

基于Redis队列实现的任务调度引擎的算法分享

2024-06-25 09:47:22
31
0

传统的流水线引擎方案,主要是基于Mysql定时任务表或开源组件如Jenkins流水线引擎来研发。在流量稍高或io消耗稍大时容易出现卡顿、运行慢问题;且由于定时任务和Jenkins都是有状态服务,无法实现节点横向扩容;最后jenkins引擎需要有专业技术团队来部署和维护,也提高了维护行的难度。这里通过基于Redis队列实现的流水线任务调度引擎,能毫秒级实现任务间的状态流转,具备较高的调度性能。

基于Redis队列实现的任务调度引擎,其中调度核心之一由4个算法组成,即:任务入队Push算法、任务出队Poll算法、任务执行算法和任务失败重试算法。

一、任务入队Push算法

任务入队Push算法主要逻辑是基于Redis有序队列来来按执行存储能代表任务信息唯一性的Hash值,目的是保证流水线执行任务能按照编排顺序串行或并行。

实现流程图如下:


1、对任务信息msg进行hash算法得到任务的hash值,再调用Zrank函数判断该hash值是否存在。如果不存在,则按步骤2执行;如果存在,那么执行步骤3。
2、给msg初始化待执行数据,开启事务,先将任务信息msg存储到hash表中,再根据系统时间和任务信息msg里的延迟时间调用得分算法,计算出当前任务对应的分数值,最后调用Zadd函数将当前任务hash值加入到任务队列中。
3、取msg里的延迟时间,调用得分算法,计算出任务最新的分数,然后调用Zadd函数更新该任务hash值的执行顺序。

二、任务出队Poll算法

任务出队Poll算法,由服务内定时器按指定频率调用,默认是50ms间隔时间。主要逻辑是调用ZrangeByScore函数获取一条任务hash值,然后发送执行这个任务的事件。

1、检查当前线程池是否有空余线程可调度,有则继续,无则返回。
2、调用ZrangeByScore函数,获取任务队列中得分最小的任务hash值。
3、调用SetNx函数获取分布式锁,通过任务hash值调用Hget函数获取任务信息msg。
4、调用Zrem函数将任务hash值从任务队列中移除,即表示任务出队。
5、调用Hset函数将修改执行次数的任务信息msg更新到hash表。
6、将任务信息msg包装成任务事件event发送,即让任务执行器监听事件,并调用后面的任务执行算法执行。

三、任务执行算法

任务执行算法,为了保持扩展性,内置了流水线内任务事件接近有22种状态,诸如开始StartPipeline、CancelStage等。此算法核心难点是较多状态机和策略模式设计。

1、执行器监听到任务出队算法的任务事件event后,会利用java反射原理得到对应event的事件处理器handler。
2、再通过线程池异步执行handler.invoke(event)方法,即执行业务的真正逻辑。handler内部除了自身业务逻辑需要用户自己定义外,其他复杂的状态递归转化为内置逻辑,可极大提高devops平台研发人员生产力。研发人员只需关心每种事件event需要做的业务逻辑,而不用考虑复杂的状态转换。

四、任务失败重试算法

任务失败重试算法,为了提高容错性,诸如网络抖动造成的任务失败等异常,此算法解决的是这种可通过自动重试来解决的异常事件,进而提高任务执行成功率。

1、有个前提:失败任务会有失败任务队列来存储任务hash值。首先,调用Zrange函数从失败任务队列中取出任务hash值集合。
2、遍历任务hash值集合,对每个任务hash,调用Hget函数获取任务信息msg。
3、将任务信息msg中执行次数和最大可执行次数对比,如果超过最大可执行次数,则调用Zrem函数从失败任务队列中移除任务hash值;如果不超过,则开启事务,给失败任务调用score算法打分,再调用Zadd函数将该任务hash值加入到正常的任务队列中。

文章来自个人专栏
DevOps建设
5 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0