写了个基于kratos的服务,基本需求是:可以增删管理定时任务(如配置每天、每周发个统计报告),支持分布式结构,持久化任务。加分项包括:注册回调、结果和日志记录、失败重试等。
看kratos已经支持的transport有两个:asynq和machinery,但似乎都不满足需要基本的持久化任务需求,machinery甚至还不能删除已经添加的任务。
// RegisterPeriodicTask register a periodic task which will be triggered periodically
func (server *Server) RegisterPeriodicTask(spec, name string, signature *tasks.Signature) error {
//check spec
schedule, err := cron.ParseStandard(spec)
if err != nil {
return err
}
f := func() {
//get lock
err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1)
if err != nil {
return
}
//send task
_, err = server.SendTask(tasks.CopySignature(signature))
if err != nil {
log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error())
}
}
//scheduler基于github.com/robfig/cron/v3
_, err = server.scheduler.AddFunc(spec, f)
return err
}
考虑到服务可能会重启,那么添加过的任务如何恢复呢?如果没有这样的库,考虑基于github.com/go-co-op/gocron
自己封装一个,配置的任务保存在redis中,服务启动的时候全量初始化,之后订阅对应的主题,再通过分布式锁来保证执行的唯一性,这样是否可行?