searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

golang异步消息队列asynq中的动态定时任务Dynamic Periodic Task

2023-10-09 08:13:15
203
0

官方说明: github.com/hibiken/asynq/wiki/Dynamic-Periodic-Task

初看以为这只是一个基于人工编辑配置文件的定时任务而已. 研究后发现, 这其实是一个非常有效的支持在程序运行中动态扩展定时任务的功能.

 

官方给的这个例子只是从配置文件中读取, 这很具有误导性, 让人以为无法在程序运行中途进行修改. 然而官方其实有一句不起眼的说明: but you can easily modify the example to work with a database or other config sources. 也就是说, 我们完全可以利用数据库来作为定时任务的配置数据源, 从而随时增删定时任务.

示例如下:

启动manager的逻辑, 注意:

1. MysqlBasedConfigProvider是下面自定义的provider

2.SyncInterval是定时读取定时任务配置的间隔, 尽量设置的短一些(但也别给数据库太大压力)

logx.Infof("Starting GlobalPeriodicTask ...")
mysqlProvidor := &MysqlBasedConfigProvider{}
logx.Info(mysqlProvidor)
globalPeriodicTaskManager, err := asynq.NewPeriodicTaskManager(
	asynq.PeriodicTaskManagerOpts{
		RedisConnOpt:               asynq.RedisClientOpt{Addr: c.RedisConf.Host, Password: c.RedisConf.Pass, DB: c.RedisDb},
		PeriodicTaskConfigProvider: mysqlProvidor,
		SyncInterval:               10 * time.Second,
	},
)
if err != nil {
	logx.Errorf("starting globalPeriodicTaskManager err: %s", err.Error())
	return
}
globalPeriodicTaskManager.Start()

 

读取定时任务配置的逻辑, 注意:

1. 这里是从MySQL数据库读取, 因此可以从其他程序里随时修改数据库, 从而变更定时任务.

type MysqlBasedConfigProvider struct {
}

func (p *MysqlBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {
	ctx := context.Background()

	var configList []*asynq.PeriodicTaskConfig

	// 构造整个config数组
	err := makeRegionGroupConfig(ctx, &configList)
	if err != nil {
		logx.Errorf("[MysqlBasedConfigProvider] GetConfigs err: %s", err.Error())
		return configList, err
	}

	return configList, nil
}

func makeRegionGroupConfig(ctx context.Context, configList *[]*asynq.PeriodicTaskConfig) error {
	groupTable := db.MysqlQuery.ConsoleScalingGroup

	// 省略获取到regionID的操作
	regionID := xxx
	
	// 示例: 查出所有group信息
	groupList, err := groupTable.WithContext(ctx).Where(
		groupTable.RegionID.Eq(regionID),
		groupTable.IsDeleted.Is(false),
	).Find()
	if err != nil {
		logx.Infof("[MysqlBasedConfigProvider:makeRegionGroupConfig] regionID: %s find group error: %s", regionID, err.Error())
		return err
	}

	// 为每个group创建一条定时任务, 用固定TaskID来保证任务不重复执行
	for _, groupInfo := range groupList {
		regionID := groupInfo.RegionID
		var groupIDstr string = strconv.FormatInt(int64(groupInfo.ID), 10)
		*configList = append(*configList, &asynq.PeriodicTaskConfig{
			Cronspec: "@every 60s",
			Task: asynq.NewTask(
				"job_name_" + groupIDstr,
				[]byte(groupIDstr),
				asynq.TaskID("MonitorUpdate-"+groupIDstr),
				asynq.MaxRetry(3),
			),
		})
		if err != nil {
			logx.Errorf("[MysqlBasedConfigProvider:makeRegionGroupConfig] makeMonitorUpdateConfig err: %s", err.Error())
		}
	}

	return nil
}
0条评论
0 / 1000
Night
2文章数
0粉丝数
Night
2 文章 | 0 粉丝
Night
2文章数
0粉丝数
Night
2 文章 | 0 粉丝
原创

golang异步消息队列asynq中的动态定时任务Dynamic Periodic Task

2023-10-09 08:13:15
203
0

官方说明: github.com/hibiken/asynq/wiki/Dynamic-Periodic-Task

初看以为这只是一个基于人工编辑配置文件的定时任务而已. 研究后发现, 这其实是一个非常有效的支持在程序运行中动态扩展定时任务的功能.

 

官方给的这个例子只是从配置文件中读取, 这很具有误导性, 让人以为无法在程序运行中途进行修改. 然而官方其实有一句不起眼的说明: but you can easily modify the example to work with a database or other config sources. 也就是说, 我们完全可以利用数据库来作为定时任务的配置数据源, 从而随时增删定时任务.

示例如下:

启动manager的逻辑, 注意:

1. MysqlBasedConfigProvider是下面自定义的provider

2.SyncInterval是定时读取定时任务配置的间隔, 尽量设置的短一些(但也别给数据库太大压力)

logx.Infof("Starting GlobalPeriodicTask ...")
mysqlProvidor := &MysqlBasedConfigProvider{}
logx.Info(mysqlProvidor)
globalPeriodicTaskManager, err := asynq.NewPeriodicTaskManager(
	asynq.PeriodicTaskManagerOpts{
		RedisConnOpt:               asynq.RedisClientOpt{Addr: c.RedisConf.Host, Password: c.RedisConf.Pass, DB: c.RedisDb},
		PeriodicTaskConfigProvider: mysqlProvidor,
		SyncInterval:               10 * time.Second,
	},
)
if err != nil {
	logx.Errorf("starting globalPeriodicTaskManager err: %s", err.Error())
	return
}
globalPeriodicTaskManager.Start()

 

读取定时任务配置的逻辑, 注意:

1. 这里是从MySQL数据库读取, 因此可以从其他程序里随时修改数据库, 从而变更定时任务.

type MysqlBasedConfigProvider struct {
}

func (p *MysqlBasedConfigProvider) GetConfigs() ([]*asynq.PeriodicTaskConfig, error) {
	ctx := context.Background()

	var configList []*asynq.PeriodicTaskConfig

	// 构造整个config数组
	err := makeRegionGroupConfig(ctx, &configList)
	if err != nil {
		logx.Errorf("[MysqlBasedConfigProvider] GetConfigs err: %s", err.Error())
		return configList, err
	}

	return configList, nil
}

func makeRegionGroupConfig(ctx context.Context, configList *[]*asynq.PeriodicTaskConfig) error {
	groupTable := db.MysqlQuery.ConsoleScalingGroup

	// 省略获取到regionID的操作
	regionID := xxx
	
	// 示例: 查出所有group信息
	groupList, err := groupTable.WithContext(ctx).Where(
		groupTable.RegionID.Eq(regionID),
		groupTable.IsDeleted.Is(false),
	).Find()
	if err != nil {
		logx.Infof("[MysqlBasedConfigProvider:makeRegionGroupConfig] regionID: %s find group error: %s", regionID, err.Error())
		return err
	}

	// 为每个group创建一条定时任务, 用固定TaskID来保证任务不重复执行
	for _, groupInfo := range groupList {
		regionID := groupInfo.RegionID
		var groupIDstr string = strconv.FormatInt(int64(groupInfo.ID), 10)
		*configList = append(*configList, &asynq.PeriodicTaskConfig{
			Cronspec: "@every 60s",
			Task: asynq.NewTask(
				"job_name_" + groupIDstr,
				[]byte(groupIDstr),
				asynq.TaskID("MonitorUpdate-"+groupIDstr),
				asynq.MaxRetry(3),
			),
		})
		if err != nil {
			logx.Errorf("[MysqlBasedConfigProvider:makeRegionGroupConfig] makeMonitorUpdateConfig err: %s", err.Error())
		}
	}

	return nil
}
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0