Concurrent.futures 与 Python 3 中的多处理

新手上路,请多包涵

Python 3.2 引入了 Concurrent Futures ,它似乎是旧线程和 多处理 模块的一些高级组合。

与旧的多处理模块相比,将其用于 CPU 绑定任务的优缺点是什么?

这篇文章 表明它们更容易使用 - 是这样吗?

原文由 GIS-Jonathan 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 639
2 个回答

我不会称 concurrent.futures 更“高级”——它是一个 更简单 的接口,无论您使用多线程还是多进程作为底层并行化噱头,它的工作原理都非常相同。

因此,就像几乎所有“更简单的界面”的实例一样,涉及到很多相同的权衡:它的学习曲线较浅,很大程度上只是因为 可供 学习的东西少得多;但是,因为它提供的选项较少,它最终可能会以更丰富的界面不会出现的方式让您感到沮丧。

就受 CPU 限制的任务而言,它的规格太低,说不出有多大意义。对于 CPython 下的 CPU 绑定任务,您需要多个进程而不是多个线程才能获得加速。但是你获得多少加速(如果有的话)取决于你的硬件、你的操作系统的细节,尤其是取决于你的特定任务需要多少进程间通信。在幕后,所有进程间并行化的噱头都依赖于相同的操作系统原语——你用来获得这些的高级 API 并不是底线速度的主要因素。

编辑:例子

这是您引用的文章中显示的最终代码,但我添加了使其工作所需的导入语句:

 from concurrent.futures import ProcessPoolExecutor
def pool_factorizer_map(nums, nprocs):
    # Let the executor divide the work among processes by using 'map'.
    with ProcessPoolExecutor(max_workers=nprocs) as executor:
        return {num:factors for num, factors in
                                zip(nums,
                                    executor.map(factorize_naive, nums))}

这里使用 multiprocessing 完全一样:

 import multiprocessing as mp
def mp_factorizer_map(nums, nprocs):
    with mp.Pool(nprocs) as pool:
        return {num:factors for num, factors in
                                zip(nums,
                                    pool.map(factorize_naive, nums))}

请注意,Python 3.3 中添加了使用 multiprocessing.Pool 对象作为上下文管理器的能力。

至于哪一个更容易使用,它们本质上是相同的。

一个不同之处在于 Pool 支持如此多不同的做事方式,以至于您可能不会意识到它是 多么 容易,直到您在学习曲线上攀登了相当长的一段路。

同样,所有这些不同的方式既是优势也是劣势。它们是一种优势,因为在某些情况下可能需要灵活性。它们是一个弱点,因为“最好只有一种明显的方法来做到这一点”。一个专门(如果可能)坚持 concurrent.futures 的项目从长远来看可能更容易维护,因为在如何使用其最小 API 方面缺乏无偿的新颖性。

原文由 Tim Peters 发布,翻译遵循 CC BY-SA 4.0 许可协议

可能在大多数情况下,当您需要并行处理时,您会发现来自 concurrent.futures 模块的 Pool ProcessPoolExecutor 或来自 multiprocessing 的 --- 类 --- 模块将提供同等的设施,归结为个人喜好问题。但是每一个都提供了一些使某些处理更方便的设施。我想我只想指出几个:

当提交一批任务时,您有时希望在任务结果(即返回值)可用时立即获得它们。这两个工具都提供通知,表明已提交任务的结果可通过回调机制获得:

使用 multiprocessing.Pool

 import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def process_result(return_value):
    print(return_value)

def main():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(worker_process, args=(i,), callback=process_result)
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

可以使用带有 concurrent.futures 的回调来完成同样的操作,尽管很笨拙:

 import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def process_result(future):
    print(future.result())

def main():
    executor = concurrent.futures.ProcessPoolExecutor()
    futures = [executor.submit(worker_process, i) for i in range(10)]
    for future in futures:
        future.add_done_callback(process_result)
    executor.shutdown()

if __name__ == '__main__':
    main()

这里每个任务都是单独提交的,并为其返回一个 Future 实例。然后必须将回调添加到 Future 。最后,当调用回调时,传递的参数是 Future 已完成任务的实例,并且必须调用方法 result 以获得实际返回值。但是有了 concurrent.futures 模块,实际上根本不需要使用回调。您可以使用 as_completed 方法:

 import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(worker_process, i) for i in range(10)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())

if __name__ == '__main__':
    main()

通过使用字典保存 Future 实例,很容易将返回值与原始传递给 worker_process 的参数联系起来:

 import concurrent.futures

def worker_process(i):
    return i * i # square the argument

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = {executor.submit(worker_process, i): i for i in range(10)}
        for future in concurrent.futures.as_completed(futures):
            i = futures[future] # retrieve the value that was squared
            print(i, future.result())

if __name__ == '__main__':
    main()

multiprocessing.Pool 有方法 imapimap_unordered ,后者允许任务结果不一定按 任意 完成顺序返回,但。这些方法被认为是 map懒惰 版本。使用方法 map ,如果传递的 可迭代 参数没有 __len__ 属性,它将首先转换为 list 用于计算长度一个有效的 chunksize 值,如果 None 作为 chunksize 参数提供。因此,您无法通过将生成器或生成器表达式用作 可迭代对象 来实现任何存储优化。但是对于方法 imapimap_unordered可迭代 对象可以是生成器或生成器表达式;它将根据需要进行迭代以产生新的提交任务。但这需要默认的 chunksize 参数为 1,因为通常无法知道 可迭代对象 的长度。但这并不能阻止您使用 multiprocessing.Pool 类使用的相同算法提供合理的值,如果您对 可迭代 的长度有很好的近似值(或下例中的 确切 大小) :

 import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def compute_chunksize(pool_size, iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, pool_size * 4)
    if extra:
        chunksize += 1
    return chunksize

def main():
    cpu_count = mp.cpu_count()
    N = 100
    chunksize = compute_chunksize(cpu_count, N)
    with mp.Pool() as pool:
        for result in pool.imap_unordered(worker_process, range(N), chunksize=chunksize):
            print(result)

if __name__ == '__main__':
    main()

但是对于 imap_unordered 没有办法轻松地将结果与提交的作业联系起来,除非工作进程返回原始调用参数以及返回值。另一方面,使用 imap_unorderedimap 指定 chunksize 的能力,应该使这些方法的结果 有效而不是重复调用 apply_async 方法,这基本上等同于使用 chunksize 1。但是如果你确实需要按完成顺序处理结果,那么要确保你应该使用方法 apply_async 带有回调函数。但是,它确实基于实验显示,如果您将 chunksize 值 1 与 imap_unordered 一起使用,结果将按完成顺序返回。

The map method of the ProcessPoolExecutor class from the concurrent.futures package is similar in one regard to the Pool.imap method from the multiprocessing 包。此方法不会将其传递的作为生成器表达式的 可迭代 参数转换为列表以计算有效的 chunksize 值,这就是为什么 chunksize 参数默认为 1 以及为什么,如果您传递大型 iterables ,您应该考虑指定适当的 chunksize 值.但是,与 Pool.imap 不同,根据我的经验,您无法通过迭代 map 返回的 可迭代 来检索第一个结果,直到所有 可迭代 都传递给 map 已经完全迭代。

multiprocessing.Pool 类有一个方法 apply 将任务提交到池并阻塞,直到结果准备好。返回值只是传递给 apply 函数的辅助函数的返回值。例如:

 import multiprocessing as mp

def worker_process(i):
    return i * i # square the argument

def main():
    with mp.Pool() as pool:
        print(pool.apply(worker_process, args=(6,)))
        print(pool.apply(worker_process, args=(4,)))

if __name__ == '__main__':
    main()

concurrent.futures.ProcessPoolExecutor 类没有这样的等价物。您必须针对返回的 Future 实例发出 submit 然后调用 result 。必须这样做并不难,但是 Pool.apply 方法对于适合阻塞任务提交的用例更方便。这种情况是当您有处理调用线程时,因为在线程中完成的大部分工作都是大量 I/O,除了一个非常受 CPU 限制的函数。创建线程的主程序首先创建一个 multiprocessing.Pool 实例并将其作为参数传递给所有线程。当线程需要调用严重依赖 CPU 的函数时,它现在使用 Pool.apply 方法运行该函数,从而在另一个进程中运行代码并释放当前进程以允许其他线程运行。

concurrent.futures 模块有两个类, ProcessPoolExecutorThreadPoolExecutor 具有相同的接口。这是一个很好的功能。但是 multiprocessing 模块还有一个未记录的 ThreadPool 类,其接口与 Pool 相同:

 >>> from multiprocessing.pool import Pool
>>> from multiprocessing.pool import ThreadPool
>>> dir(Pool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>> dir(ThreadPool)
['Process', '__class__', '__del__', '__delattr__', '__dict__', '__dir__', '__doc__', '__enter__', '__eq__', '__exit__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_check_running', '_get_sentinels', '_get_tasks', '_get_worker_sentinels', '_guarded_task_generation', '_handle_results', '_handle_tasks', '_handle_workers', '_help_stuff_finish', '_join_exited_workers', '_maintain_pool', '_map_async', '_repopulate_pool', '_repopulate_pool_static', '_setup_queues', '_terminate_pool', '_wait_for_updates', '_wrap_exception', 'apply', 'apply_async', 'close', 'imap', 'imap_unordered', 'join', 'map', 'map_async', 'starmap', 'starmap_async', 'terminate']
>>>

You can submit tasks with either ProcessPoolExecutor.submit , which returns a Future instance, or Pool.apply_async , which returns an AsyncResult instance, and specify a检索结果的超时值:

 from concurrent.futures import ProcessPoolExecutor, TimeoutError
from time import sleep

def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with ProcessPoolExecutor(1) as pool:
        future = pool.submit(worker_1)
        try:
            future.result(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")

印刷:

 hanging
hanging
hanging
timeout
hanging
hanging
hanging
hanging
hanging
hanging
hanging
etc.

调用 future.result(3) 时的主进程将在 3 秒后获得 TimeoutError 异常,因为提交的任务尚未在该时间段内完成。但任务继续运行,占用进程和 with ProcessPoolExecutor(1) as pool: 块永远不会退出,因此程序不会终止。

 from multiprocessing import Pool, TimeoutError
from time import sleep

def worker_1():
    while True:
        print('hanging')
        sleep(1)

def main():
    with Pool(1) as pool:
        result = pool.apply_async(worker_1, args=())
        try:
            result.get(3) # kill task after 3 seconds?
        except TimeoutError:
            print('timeout')

if __name__ == '__main__':
    main()
    print("return from main()")

印刷:

 hanging
hanging
hanging
timeout
return from main()

然而,这一次,即使超时任务仍在继续运行并占用进程, with 块没有被阻止退出,因此程序正常终止。这样做的原因是 Pool 实例的上下文管理器将在块退出时执行对 terminate 的调用,这会导致池中所有进程立即终止。这与 ProcessPoolExecutor 实例的上下文处理程序形成对比,后者执行对 shutdown(wait=True) 的调用,以在其管理的块退出时等待池中所有进程的终止。如果您正在使用上下文处理程序来处理池终止并且存在超时的可能性,那么优势似乎是 multiprocessing.Pool 。更新:在 Python 3.9 中,新参数 _cancelfutures 已添加到 shutdown 方法中。因此,如果您显式调用 shutdown(wait=False, cancel_futures=True) 而不是依赖于隐式调用 shutdown 产生的默认行为,则可以终止所有正在运行的任务和等待运行的任务 --- 使用上下文处理程序时.

But since the context handler for multiprocessing.Pool only calls terminate and not close followed by join , you must then ensure that all the jobs you在退出 with 块之前已提交已完成,例如通过使用阻塞同步调用提交作业,例如 AsyncResult map 或调用 get --- object returned by a call to apply_async or iterating the results of the call to imap or by calling close followed by join 在池实例上。

尽管在使用 ProcessPoolExecutor 时超时任务完成之前无法退出,但您可以 取消 启动尚未运行的已提交任务。在下面的演示中,我们有一个大小为 1 的池,因此作业只能连续运行。我们一个接一个地提交 3 个作业,其中前两个作业需要 3 秒才能运行,因为调用了 time.sleep(3) 。我们立即尝试取消前两个作业。第一次取消尝试失败,因为第一个作业已经在运行。但是因为池只有一个进程,所以第二个作业必须等待第一个作业完成 3 秒才能开始运行,因此取消成功。最后,作业 3 将在作业 1 完成后几乎立即开始和结束,这将是我们开始作业提交后大约 3 秒:

 from concurrent.futures import ProcessPoolExecutor
import time

def worker1(i):
    time.sleep(3)
    print('Done', i)

def worker2():
    print('Hello')

def main():
    with ProcessPoolExecutor(max_workers=1) as executor:
        t = time.time()
        future1 = executor.submit(worker1, 1)
        future2 = executor.submit(worker1, 2)
        future3 = executor.submit(worker2)
        # this will fail since this task is already running:
        print(future1.cancel())
        # this will succeed since this task hasn't started (it's waiting for future1 to complete):
        print(future2.cancel())
        future3.result() # wait for completion
        print(time.time() - t)

if __name__ == '__main__':
    main()

印刷:

 False
True
Done 1
Hello
3.1249606609344482

原文由 Booboo 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题