我几乎没有阻塞函数 foo
, bar
我无法更改它们(一些我不控制的内部库。与一个或多个网络服务对话)。我如何将它用作异步?例如,我不想执行以下操作。
results = []
for inp in inps:
val = foo(inp)
result = bar(val)
results.append(result)
这将是低效的,因为我可以调用 foo
作为第二个输入,而我正在等待第一个和相同的 bar
。我如何包装它们以便它们可以与 asyncio 一起使用(即新的 async
, await
语法)?
让我们假设函数是可重入的。也就是说,当之前的 foo
正在处理时,可以再次调用 foo
。
更新
使用可重复使用的装饰器扩展答案。例如,单击 此处。
def run_in_executor(f):
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))
return inner
原文由 balki 发布,翻译遵循 CC BY-SA 4.0 许可协议
这里有(有点)两个问题:
可以使用高级
asyncio.create_task
或低级asyncio.ensure_future
创建并发任务。从 3.11 开始,它们也可以通过 asyncio 任务组 创建,正如 Trio 库所开创的那样(Trio 的创建者在 这里 有一篇关于这个主题的优秀博客文章)。要运行同步代码,您需要 在 executor 中运行阻塞代码。例子:
如果您想使用 for 循环(如您的示例)安排这些任务,您有几种不同的策略,但基本方法是使用 for 循环(或列表理解等) 安排 任务,并使用 asyncio 等待它们。等待, 然后 检索结果。例子: