如何将 asyncio 与现有的阻塞库一起使用?

新手上路,请多包涵

我几乎没有阻塞函数 foo , bar 我无法更改它们(一些我不控制的内部库。与一个或多个网络服务对话)。我如何将它用作异步?例如,我不想执行以下操作。

 results = []
for inp in inps:
    val = foo(inp)
    result = bar(val)
    results.append(result)

这将是低效的,因为我可以调用 foo 作为第二个输入,而我正在等待第一个和相同的 bar 。我如何包装它们以便它们可以与 asyncio 一起使用(即新的 async , await 语法)?

让我们假设函数是可重入的。也就是说,当之前的 foo 正在处理时,可以再次调用 foo


更新

使用可重复使用的装饰器扩展答案。例如,单击 此处

 def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))

    return inner

原文由 balki 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 658
2 个回答

这里有(有点)两个问题:

  1. 如何在协程中异步运行阻塞代码
  2. 我怎样才能在“同时”运行多个异步任务(顺便说一句:asyncio 是单线程的,所以它是 并发的,但不是 真正 的并行)。

可以使用高级 asyncio.create_task 或低级 asyncio.ensure_future 创建并发任务。从 3.11 开始,它们也可以通过 asyncio 任务组 创建,正如 Trio 库所开创的那样(Trio 的创建者在 这里 有一篇关于这个主题的优秀博客文章)。

要运行同步代码,您需要 在 executor 中运行阻塞代码。例子:

 import concurrent.futures
import asyncio
import time

def blocking(delay):
    time.sleep(delay)
    print('Completed.')

async def non_blocking(executor):
    loop = asyncio.get_running_loop()
    # Run three of the blocking tasks concurrently. asyncio.wait will
    # automatically wrap these in Tasks. If you want explicit access
    # to the tasks themselves, use asyncio.ensure_future, or add a
    # "done, pending = asyncio.wait..." assignment
    await asyncio.wait(
        fs={
            # Returns after delay=12 seconds
            loop.run_in_executor(executor, blocking, 12),

            # Returns after delay=14 seconds
            loop.run_in_executor(executor, blocking, 14),

            # Returns after delay=16 seconds
            loop.run_in_executor(executor, blocking, 16)
        },
        return_when=asyncio.ALL_COMPLETED
    )

executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
asyncio.run(non_blocking(executor))

如果您想使用 for 循环(如您的示例)安排这些任务,您有几种不同的策略,但基本方法是使用 for 循环(或列表理解等) 安排 任务,并使用 asyncio 等待它们。等待, 然后 检索结果。例子:

 done, pending = await asyncio.wait(
    fs=[loop.run_in_executor(executor, blocking_foo, *args) for args in inps],
    return_when=asyncio.ALL_COMPLETED
)

# Note that any errors raise during the above will be raised here; to
# handle errors you will need to call task.exception() and check if it
# is not None before calling task.result()
results = [task.result() for task in done]

原文由 Nick Badger 发布,翻译遵循 CC BY-SA 4.0 许可协议

扩展已接受的答案以实际解决所讨论的问题。

注意:需要 python 3.7+

 import functools

from urllib.request import urlopen
import asyncio

def legacy_blocking_function():  # You cannot change this function
    r = urlopen("https://example.com")
    return r.read().decode()

def run_in_executor(f):
    @functools.wraps(f)
    def inner(*args, **kwargs):
        loop = asyncio.get_running_loop()
        return loop.run_in_executor(None, lambda: f(*args, **kwargs))

    return inner

@run_in_executor
def foo(arg):  # Your wrapper for async use
    resp = legacy_blocking_function()
    return f"{arg}{len(resp)}"

@run_in_executor
def bar(arg):  # Another wrapper
    resp = legacy_blocking_function()
    return f"{len(resp)}{arg}"

async def process_input(inp):  # Modern async function (coroutine)
    res = await foo(inp)
    res = f"XXX{res}XXX"
    return await bar(res)

async def main():
    inputs = ["one", "two", "three"]
    input_tasks = [asyncio.create_task(process_input(inp)) for inp in inputs]
    print([await t for t in asyncio.as_completed(input_tasks)])
    # This doesn't work as expected :(
    # print([await t for t in asyncio.as_completed([process_input(inp) for inp in input_tasks])])

if __name__ == '__main__':
asyncio.run(main())


单击 此处 获取此示例的最新版本并发送拉取请求。

原文由 balki 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题