如何使用 Python asyncio 限制并发?

新手上路,请多包涵

假设我们有一堆链接要下载,每个链接可能需要不同的下载时间。而且我最多只能使用 3 个连接进行下载。现在,我想确保使用 asyncio 高效地执行此操作。

这是我要实现的目标:在任何时间点,尽量确保我至少有 3 次下载正在运行。

 Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----

数字代表下载链接,连字符代表等待下载。

这是我现在正在使用的代码

from random import randint
import asyncio

count = 0

async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()

async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

输出符合预期:

 downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8

但这是我的问题:

  1. 目前,我只是等待 9 秒来保持主要功能运行,直到下载完成。在退出 main 功能之前,是否有等待最后一次下载完成的有效方法? (我知道有 asyncio.wait ,但我需要存储所有任务引用才能正常工作)

  2. 执行此类任务的好图书馆是什么?我知道 javascript 有很多异步库,但是 Python 呢?

编辑:2. 什么是处理常见异步模式的好库? (类似于 异步

原文由 Shridharshan 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 896
2 个回答

在阅读此答案的其余部分之前,请注意使用 asyncio 限制并行任务数量的惯用方法是使用 asyncio.Semaphore ,如 Mikhail 的答案 所示并在 Andrei 的答案 中优雅地抽象出来。这个答案包含工作,但实现相同的方法有点复杂。我留下答案是因为在某些情况下,这种方法比信号量更有优势,特别是当要完成的工作非常大或不受限制,并且您无法提前创建所有协程时。在那种情况下,第二个(基于队列的)解决方案就是这个答案就是你想要的。但在大多数常规情况下,例如通过 aiohttp 并行下载,您应该改用信号量。


您基本上需要一个固定大小的下载任务 _池_。 asyncio 没有预制的任务池,但创建一个很容易:只需保留一组任务并且不要让它超过限制。尽管问题表明您不愿意走那条路,但代码最终要优雅得多:

 import asyncio, random

async def download(code):
    wait_time = random.randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

另一种方法是创建固定数量的协程来执行下载,就像固定大小的线程池一样,并使用 asyncio.Queue 为它们提供工作。这消除了手动限制下载次数的需要,下载次数将自动受调用 download() 的协程数量限制:

 # download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

至于您的其他问题,显而易见的选择是 aiohttp

原文由 user4815162342 发布,翻译遵循 CC BY-SA 4.0 许可协议

如果我没记错的话,您正在搜索 asyncio.Semaphore 。使用示例:

 import asyncio
from random import randint

async def download(code):
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

sem = asyncio.Semaphore(3)

async def safe_download(i):
    async with sem:  # semaphore limits num of simultaneous downloads
        return await download(i)

async def main():
    tasks = [
        asyncio.ensure_future(safe_download(i))  # creating task starts coroutine
        for i
        in range(9)
    ]
    await asyncio.gather(*tasks)  # await moment all downloads done

if __name__ ==  '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

输出:

 downloading 0 will take 3 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 3 second(s)
downloaded 1
downloaded 0
downloading 4 will take 2 second(s)
downloading 5 will take 1 second(s)
downloaded 5
downloaded 3
downloading 6 will take 3 second(s)
downloading 7 will take 1 second(s)
downloaded 4
downloading 8 will take 2 second(s)
downloaded 7
downloaded 8
downloaded 6

可以在 此处 找到使用 aiohttp 进行异步下载的示例。请注意, aiohttp 内置了一个等效的信号量,您可以在 此处 查看示例。它的默认限制为 100 个连接。

原文由 Mikhail Gerasimov 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题