如何在 Python 中使用 Flask 执行周期性任务

新手上路,请多包涵

我一直在使用 Flask 为我的 k8055 USB 接口板提供一个简单的 Web API;相当标准的 getters 和 putters,Flask 确实让我的生活轻松多了。

但是我希望能够在乳清发生时将状态变化注册为/接近。

例如,如果我有一个按钮连接到板上,我可以轮询该特定端口的 api。但是如果我想让输出直接反映输出,不管是否有人在与 api 对话,我都会有这样的东西。

 while True:
    board.read()
    board.digital_outputs = board.digital_inputs
    board.read()
    time.sleep(1)

每一秒,输出都会更新以匹配输入。

Flask下有没有办法做这种事情?我以前在 Twisted 中做过类似的事情,但是 Flask 对于这个特定的应用程序来说太方便了,还不能放弃它……

谢谢。

原文由 Bolster 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 559
2 个回答

您可以使用 cron 来完成简单的任务。

为您的任务创建一个烧瓶视图。

 # a separate view for periodic task
@app.route('/task')
def task():
    board.read()
    board.digital_outputs = board.digital_inputs

然后使用 cron,定期从该 url 下载

# cron task to run each minute
0-59 * * * * run_task.sh

run_task.sh 内容在哪里

wget http://localhost/task

Cron 的运行频率不能超过每分钟一次。如果您需要更高的频率,(例如,每 5 秒 = 每分钟 12 次),您必须按以下方式在 tun_task.sh 中执行

# loop 12 times with a delay
for i in 1 2 3 4 5 6 7 8 9 10 11 12
do
    # download url in background for not to affect delay interval much
    wget -b http://localhost/task
    sleep 5s
done

原文由 Pashka 发布,翻译遵循 CC BY-SA 3.0 许可协议

对于我的 Flask 应用程序,我考虑使用 Pashka 在他的 回答 中描述的 cron 方法、 计划 库和 APScheduler

我发现 APScheduler 很简单并且可以满足周期性任务运行的目的,所以继续使用 APScheduler。

示例代码:

 from flask import Flask

from apscheduler.schedulers.background import BackgroundScheduler

app = Flask(__name__)

def test_job():
    print('I am working...')

scheduler = BackgroundScheduler()
job = scheduler.add_job(test_job, 'interval', minutes=1)
scheduler.start()

原文由 Antony 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏