我如何创建一个包装器,使芹菜任务看起来像 asyncio.Task
?或者是否有更好的方法将 Celery 与 asyncio
集成?
@asksol,Celery 的创造者,是 这样说 的:
在异步 I/O 框架之上使用 Celery 作为分布式层是很常见的(最重要的提示:将 CPU 绑定任务路由到 prefork worker 意味着它们不会阻塞你的事件循环)。
但是我找不到任何专门针对 asyncio
框架的代码示例。
原文由 max 发布,翻译遵循 CC BY-SA 4.0 许可协议
编辑:2021 年 1 月 12 日以前的答案(在底部找到它)没有很好地老化因此我添加了可能的解决方案组合,这些解决方案可能会满足那些仍在寻找如何共同使用 asyncio 和 Celery 的人
让我们首先快速分解用例(更深入的分析在这里: asyncio and coroutines vs task queues ):
因此,在 Python 的“做一件事并做好”的背景下,不要尝试将 asyncio 和 celery 混合在一起是有意义的。
但是如果我们希望能够异步运行一个方法并作为异步任务运行,会发生什么情况?那么我们可以考虑一些选择:
我能找到的最好的例子如下: https ://johnfraney.ca/posts/2018/12/20/writing-unit-tests-celery-tasks-async-functions/(我刚刚发现这是 @Franey 的回应):
定义您的异步方法。
使用
asgiref
的sync.async_to_sync
模块来包装异步方法并在芹菜任务中同步运行它:我在 FastAPI 应用程序中遇到的一个用例与前面的示例相反:
密集的 CPU 绑定进程正在占用异步端点。
解决方案是将异步 CPU 绑定进程重构为 celery 任务,并传递一个任务实例以从 Celery 队列中执行。
该案例可视化的最小示例:
另一个解决方案似乎是 @juanra 和 @danius 在他们的答案中提出的,但我们必须记住,当我们混合同步和异步执行时,性能往往会受到影响,因此在我们决定使用之前需要监控这些答案他们在生产环境中。
最后,还有一些现成的解决方案,我不能推荐(因为我自己没有使用过)但我会在这里列出它们:
好吧,那不是那么好吗? Celery 5.0 版没有实现 asyncio 兼容性,因此我们不知道何时以及是否会实现这一点……出于响应遗留原因(因为它是当时的答案)和继续评论,将其留在此处。
正如官方网站上所述,从 Celery 5.0 版开始,这将成为可能:
http://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#preface
以上是从上一个链接引用的。
所以最好的办法是等待 5.0 版本 发布!
与此同时,快乐的编码:)