如何将芹菜与异步相结合?

新手上路,请多包涵

我如何创建一个包装器,使芹菜任务看起来像 asyncio.Task ?或者是否有更好的方法将 Celery 与 asyncio 集成?

@asksol,Celery 的创造者,是 这样说 的:

在异步 I/O 框架之上使用 Celery 作为分布式层是很常见的(最重要的提示:将 CPU 绑定任务路由到 prefork worker 意味着它们不会阻塞你的事件循环)。

但是我找不到任何专门针对 asyncio 框架的代码示例。

原文由 max 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.4k
2 个回答

编辑:2021 年 1 月 12 日以前的答案(在底部找到它)没有很好地老化因此我添加了可能的解决方案组合,这些解决方案可能会满足那些仍在寻找如何共同使用 asyncio 和 Celery 的人

让我们首先快速分解用例(更深入的分析在这里: asyncio and coroutines vs task queues ):

  • 如果任务受 I/O 限制,那么使用协程和异步往往会更好。
  • 如果任务受 CPU 限制,那么使用 Celery 或其他类似的任务管理系统往往会更好。

因此,在 Python 的“做一件事并做好”的背景下,不要尝试将 asyncio 和 celery 混合在一起是有意义的。

但是如果我们希望能够异步运行一个方法并作为异步任务运行,会发生什么情况?那么我们可以考虑一些选择:

  • 我能找到的最好的例子如下: https ://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现这是 @Franey 的回应):

    1. 定义您的异步方法。

    2. 使用 asgirefsync.async_to_sync 模块来包装异步方法并在芹菜任务中同步运行它:

      # tasks.py
     import asyncio
     from asgiref.sync import async_to_sync
     from celery import Celery
    
    
     app = Celery('async_test', broker='a_broker_url_goes_here')
    
    
     async def return_hello():
         await asyncio.sleep(1)
         return 'hello'
    
    
     @app.task(name="sync_task")
     def sync_task():
         async_to_sync(return_hello)()
    
    
    
  • 我在 FastAPI 应用程序中遇到的一个用例与前面的示例相反:

    1. 密集的 CPU 绑定进程正在占用异步端点。

    2. 解决方案是将异步 CPU 绑定进程重构为 celery 任务,并传递一个任务实例以从 Celery 队列中执行。

    3. 该案例可视化的最小示例:

      import asyncio
     import uvicorn
    
    
     from celery import Celery
     from fastapi import FastAPI
    
    
     app = FastAPI(title='Example')
     worker = Celery('worker', broker='a_broker_url_goes_here')
    
    
     @worker.task(name='cpu_boun')
     def cpu_bound_task():
         # Does stuff but let's simplify it
         print([n for n in range(1000)])
    
    
     @app.get('/calculate')
     async def calculate():
         cpu_bound_task.delay()
    
    
     if __name__ == "__main__":
         uvicorn.run('main:app', host='0.0.0.0', port=8000)
    
    
    
  • 另一个解决方案似乎是 @juanra@danius 在他们的答案中提出的,但我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此在我们决定使用之前需要监控这些答案他们在生产环境中。

最后,还有一些现成的解决方案,我不能推荐(因为我自己没有使用过)但我会在这里列出它们:

  • Celery Pool AsyncIO 似乎完全解决了 Celery 5.0 没有解决的问题,但请记住,它似乎有点实验性(今天 01/12/2021 版本 0.2.0)
  • aiotasks 声称是“一个类似 Celery 的任务管理器,可以分发 Asyncio 协程”,但看起来有点陈旧(大约 2 年前的最新提交)

好吧,那不是那么好吗? Celery 5.0 版没有实现 asyncio 兼容性,因此我们不知道何时以及是否会实现这一点……出于响应遗留原因(因为它是当时的答案)和继续评论,将其留在此处。

正如官方网站上所述,从 Celery 5.0 版开始,这将成为可能:

http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface

  1. Celery 的下一个主要版本将仅支持 Python 3.5,我们计划在其中利用新的 asyncio 库。
  2. 放弃对 Python 2 的支持将使我们能够删除大量的兼容性代码,而使用 Python 3.5 使我们能够利用 typing、async/await、asyncio 和类似的概念,这些概念在旧版本中是无法替代的。

以上是从上一个链接引用的。

所以最好的办法是等待 5.0 版本 发布!

与此同时,快乐的编码:)

原文由 John Moutafis 发布,翻译遵循 CC BY-SA 4.0 许可协议

这种简单的方法对我来说效果很好:

 import asyncio
from celery import Celery

app = Celery('tasks')

async def async_function(param1, param2):
    # more async stuff...
    pass

@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
    asyncio.run(async_function(param1, param2))

原文由 juanra 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题