用芹菜运行“独特”的任务

新手上路,请多包涵

我使用芹菜来更新我的新闻聚合站点中的 RSS 提要。我为每个提要使用一个@task,一切似乎都运行良好。

不过有一个细节我不确定是否处理得好:所有提要每分钟更新一次 @periodic_task,但是如果在启动新的周期性任务时提要仍在从上一个周期性任务更新怎么办? (例如,如果提要真的很慢,或者离线并且任务处于重试循环中)

目前我存储任务结果并检查它们的状态,如下所示:

 import socket
from datetime import timedelta
from celery.decorators import task, periodic_task
from aggregator.models import Feed

_results = {}

@periodic_task(run_every=timedelta(minutes=1))
def fetch_articles():
    for feed in Feed.objects.all():
        if feed.pk in _results:
            if not _results[feed.pk].ready():
                # The task is not finished yet
                continue
        _results[feed.pk] = update_feed.delay(feed)

@task()
def update_feed(feed):
    try:
        feed.fetch_articles()
    except socket.error, exc:
        update_feed.retry(args=[feed], exc=exc)

也许有一种更复杂/更强大的方法可以使用我错过的一些芹菜机制来实现相同的结果?

原文由 Luper Rouch 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 431
2 个回答

根据 MattH 的回答,您可以使用这样的装饰器:

 def single_instance_task(timeout):
    def task_exc(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            lock_id = "celery-single-instance-" + func.__name__
            acquire_lock = lambda: cache.add(lock_id, "true", timeout)
            release_lock = lambda: cache.delete(lock_id)
            if acquire_lock():
                try:
                    func(*args, **kwargs)
                finally:
                    release_lock()
        return wrapper
    return task_exc

然后,像这样使用它……

 @periodic_task(run_every=timedelta(minutes=1))
@single_instance_task(60*10)
def fetch_articles()
    yada yada...

原文由 SteveJ 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题