go 有没有能持久化的分布式定时任务库?

写了个基于kratos的服务,基本需求是:可以增删管理定时任务(如配置每天、每周发个统计报告),支持分布式结构,持久化任务。加分项包括:注册回调、结果和日志记录、失败重试等。

看kratos已经支持的transport有两个:asynq和machinery,但似乎都不满足需要基本的持久化任务需求,machinery甚至还不能删除已经添加的任务。


// RegisterPeriodicTask register a periodic task which will be triggered periodically
func (server *Server) RegisterPeriodicTask(spec, name string, signature *tasks.Signature) error {
    //check spec
    schedule, err := cron.ParseStandard(spec)
    if err != nil {
        return err
    }

    f := func() {
        //get lock
        err := server.lock.LockWithRetries(utils.GetLockName(name, spec), schedule.Next(time.Now()).UnixNano()-1)
        if err != nil {
            return
        }

        //send task
        _, err = server.SendTask(tasks.CopySignature(signature))
        if err != nil {
            log.ERROR.Printf("periodic task failed. task name is: %s. error is %s", name, err.Error())
        }
    }

    //scheduler基于github.com/robfig/cron/v3
    _, err = server.scheduler.AddFunc(spec, f)
    return err
}

考虑到服务可能会重启,那么添加过的任务如何恢复呢?如果没有这样的库,考虑基于github.com/go-co-op/gocron 自己封装一个,配置的任务保存在redis中,服务启动的时候全量初始化,之后订阅对应的主题,再通过分布式锁来保证执行的唯一性,这样是否可行?

阅读 3.4k
1 个回答
package main

import (
    "fmt"
    "github.com/go-co-op/gocron"
    "github.com/go-redis/redis/v8"
    "golang.org/x/net/context"
    "sync"
)

var (
    redisClient *redis.Client
    scheduler   *gocron.Scheduler
    ctx         = context.Background()
)

func init() {
    redisClient = redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    scheduler = gocron.NewScheduler(time.UTC)
}

func main() {
    var wg sync.WaitGroup
    wg.Add(1)

    loadTasksFromRedis()

    scheduler.StartAsync()

    wg.Wait()
}

func loadTasksFromRedis() {
    // 从 Redis 获取所有任务
    taskKeys, err := redisClient.Keys(ctx, "task:*").Result()
    if err != nil {
        fmt.Println("Error fetching tasks from Redis:", err)
        return
    }

    for _, taskKey := range taskKeys {
        taskData, err := redisClient.HGetAll(ctx, taskKey).Result()
        if err != nil {
            fmt.Println("Error fetching task data from Redis:", err)
            continue
        }

        createTaskFromData(taskData)
    }
}

func createTaskFromData(taskData map[string]string) {
    // 解析 taskData 并创建任务
    // ...

    // 添加任务到调度器
    // ...
}

func executeTask(taskData map[string]string) {
    // 尝试获取分布式锁
    lock, err := redisClient.SetNX(ctx, "lock:"+taskData["id"], 1, time.Duration(taskData["lockDuration"])*time.Second).Result()
    if err != nil || !lock {
        fmt.Println("Failed to acquire lock for task:", taskData["id"])
        return
    }

    // 执行任务
    // ...

    // 释放分布式锁
    redisClient.Del(ctx, "lock:"+taskData["id"])
}
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题