Django - 每 x 秒运行一个函数

新手上路,请多包涵

我正在开发 Django 应用程序。我有一个 API 端点,如果请求,它必须执行必须重复几次的功能(直到某个条件为真)。我现在的处理方式是——

 def shut_down(request):
  # Do some stuff
  while True:
    result = some_fn()
    if result:
      break
    time.sleep(2)

  return True

虽然我知道这是一种糟糕的方法并且我不应该阻塞 2 秒,但我不知道如何绕过它。

这有效,等待 4 秒后。但我想要一些让循环在后台运行的东西,并在 some_fn 返回 True 时停止。 (还有,肯定some_fn会返回True)

编辑 -

阅读 Oz123 的回复给了我一个似乎可行的想法。这是我所做的 -

 def shut_down(params):
    # Do some stuff
    # Offload the blocking job to a new thread

    t = threading.Thread(target=some_fn, args=(id, ), kwargs={})
    t.setDaemon(True)
    t.start()

    return True

def some_fn(id):
    while True:
        # Do the job, get result in res
        # If the job is done, return. Or sleep the thread for 2 seconds before trying again.

        if res:
            return
        else:
            time.sleep(2)

这对我有用。这很简单,但我不知道多线程与 Django 结合的效率如何。

如果有人能指出其中的缺陷,不胜感激。

原文由 Zeokav 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 445
2 个回答

对于许多小项目来说, celery 有点矫枉过正。对于那些你可以使用 schedule 的项目,它非常容易使用。

使用此库,您可以使任何函数定期执行任务:

 import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at("10:30").do(job)
schedule.every().monday.do(job)
schedule.every().wednesday.at("13:15").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

该示例以阻塞方式运行,但是如果您查看常见问题解答,您会发现您也可以在并行线程中运行任务,这样您就不会阻塞,并且一旦不再需要就删除任务:

 import threading
import time

from schedule import Scheduler

def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run

Scheduler.run_continuously = run_continuously

这是在类方法中使用的示例:

     def foo(self):
        ...
        if some_condition():
           return schedule.CancelJob  # a job can dequeue it

    # can be put in __enter__ or __init__
    self._job_stop = self.scheduler.run_continuously()

    logger.debug("doing foo"...)
    self.foo() # call foo
    self.scheduler.every(5).seconds.do(
        self.foo) # schedule foo for running every 5 seconds

    ...
    # later on foo is not needed any more:
    self._job_stop.set()

    ...

    def __exit__(self, exec_type, exc_value, traceback):
        # if the jobs are not stop, you can stop them
        self._job_stop.set()


原文由 oz123 发布,翻译遵循 CC BY-SA 4.0 许可协议

这个答案稍微扩展了 Oz123 的答案

为了让事情正常进行,我创建了一个名为 mainapp/jobs.py 的文件来包含我的计划作业。然后,在我的 apps.py 模块中,我将 from . import jobs 放在 ready 方法中。这是我的整个 apps.py 文件:

 from django.apps import AppConfig
import os

class MainappConfig(AppConfig):
    name = 'mainapp'

    def ready(self):
        from . import jobs

        if os.environ.get('RUN_MAIN', None) != 'true':
            jobs.start_scheduler()

RUN_MAIN 检查是因为 python manage.py runserver 运行了 ready 方法两次—— 在两个进程中各运行一次——但我们只想运行一次)

现在,这是我放入我的 jobs.py 文件中的内容。首先是进口。您需要导入 Schedulerthreadingtime 如下。 FUserHolding 导入只是为了我的工作;你不会导入这些。

 from django.db.models import F
from schedule import Scheduler
import threading
import time

from .models import UserHolding

接下来,编写要调度的函数。以下纯属举例;你的函数看起来不会像这样。

 def give_admin_gold():
    admin_gold_holding = (UserHolding.objects
        .filter(inventory__user__username='admin', commodity__name='gold'))

    admin_gold_holding.update(amount=F('amount') + 1)

接下来,通过将 run_continuously 方法添加到其 Scheduler 类中,猴子修补 schedule 模块。使用以下代码执行此操作,该代码是从 Oz123 的回答中逐字复制的。

 def run_continuously(self, interval=1):
    """Continuously run, while executing pending jobs at each elapsed
    time interval.
    @return cease_continuous_run: threading.Event which can be set to
    cease continuous run.
    Please note that it is *intended behavior that run_continuously()
    does not run missed jobs*. For example, if you've registered a job
    that should run every minute and you set a continuous run interval
    of one hour then your job won't be run 60 times at each interval but
    only once.
    """

    cease_continuous_run = threading.Event()

    class ScheduleThread(threading.Thread):

        @classmethod
        def run(cls):
            while not cease_continuous_run.is_set():
                self.run_pending()
                time.sleep(interval)

    continuous_thread = ScheduleThread()
    continuous_thread.setDaemon(True)
    continuous_thread.start()
    return cease_continuous_run

Scheduler.run_continuously = run_continuously

最后,定义一个函数来创建 Scheduler 对象,连接你的作业,并调用调度程序的 run_continuously 方法。

 def start_scheduler():
    scheduler = Scheduler()
    scheduler.every().second.do(give_admin_gold)
    scheduler.run_continuously()

原文由 Tanner - reinstate LGBT people 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏