如何将具有多个参数的函数传递给 python concurrent.futures.ProcessPoolExecutor.map()?

新手上路,请多包涵

我想 concurrent.futures.ProcessPoolExecutor.map() 调用由 2 个或更多参数组成的函数。在下面的示例中,我使用了 lambda 函数并定义了 ref 作为大小与 numberlist 具有相同值的数组。

第一个问题: 有更好的方法吗?在 numberlist 的大小可以是百万到十亿个元素的情况下,因此 ref 大小必须遵循 numberlist,这种方法不必要地占用宝贵的内存,我想避免这种情况。我这样做是因为我阅读了 map 函数将终止其映射,直到到达最短的数组末端。

 import concurrent.futures as cf

nmax = 10
numberlist = range(nmax)
ref = [5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
workers = 3

def _findmatch(listnumber, ref):
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x

a = map(lambda x, y: _findmatch(x, y), numberlist, ref)
for n in a:
    print(n)
    if str(ref[0]) in n:
        print('match')

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(lambda x, y: _findmatch(x, ref), numberlist, ref):
        print(type(n))
        print(n)
        if str(ref[0]) in n:
            print('match')

运行上面的代码,我发现 map 函数能够达到我想要的结果。但是,当我将相同的条款转移到 concurrent.futures.ProcessPoolExecutor.map() 时,python3.5 失败并出现此错误:

 Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <function <lambda> at 0x7fd2a14db0d0>: attribute lookup <lambda> on __main__ failed

问题 2 :为什么会发生此错误以及如何让 concurrent.futures.ProcessPoolExecutor.map() 调用具有多个参数的函数?

原文由 Sun Bear 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.3k
2 个回答

首先回答你的第二个问题,你会遇到一个异常,因为你正在使用的 lambda 函数不可 picklable。由于 Python 使用 pickle 协议序列化主进程和 ProcessPoolExecutor 的工作进程之间传递的数据,这是一个问题。目前尚不清楚您为什么使用 lambda 。您拥有的 lambda 有两个参数,就像原始函数一样。您可以直接使用 _findmatch 而不是 lambda 它应该可以工作。

 with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_findmatch, numberlist, ref):
        ...

至于第一个关于在不创建巨型列表的情况下传递第二个常量参数的问题,您可以通过多种方式解决这个问题。一种方法可能是使用 itertools.repeat 创建一个可迭代对象,该对象在迭代时永远重复相同的值。

但是更好的方法可能是编写一个额外的函数来为您传递常量参数。 (也许这就是你尝试使用 lambda 函数的原因?)如果你使用的函数可以在模块的顶级命名空间中访问,它应该可以工作:

 def _helper(x):
    return _findmatch(x, 5)

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    for n in executor.map(_helper, numberlist):
        ...

原文由 Blckknght 发布,翻译遵循 CC BY-SA 3.0 许可协议

(1) 无需列清单。您可以使用 itertools.repeat 创建一个只重复 some 值的迭代器。

(2) 你需要传递一个具名函数给 map 因为它会传递给子进程执行。 map 使用 pickle 协议发送东西,lambdas 不能被腌制,因此它们不能成为地图的一部分。但它完全没有必要。您的 lambda 所做的只是调用一个带有 2 个参数的 2 参数函数。完全删除它。

工作代码是

import concurrent.futures as cf
import itertools

nmax = 10
numberlist = range(nmax)
workers = 3

def _findmatch(listnumber, ref):
    print('def _findmatch(listnumber, ref):')
    x=''
    listnumber=str(listnumber)
    ref = str(ref)
    print('listnumber = {0} and ref = {1}'.format(listnumber, ref))
    if ref in listnumber:
        x = listnumber
    print('x = {0}'.format(x))
    return x

with cf.ProcessPoolExecutor(max_workers=workers) as executor:
    #for n in executor.map(_findmatch, numberlist):
    for n in executor.map(_findmatch, numberlist, itertools.repeat(5)):
        print(type(n))
        print(n)
        #if str(ref[0]) in n:
        #    print('match')

原文由 tdelaney 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题