Python 多处理锁

新手上路,请多包涵

此多处理代码按预期工作。它创建 4 个 Python 进程,并使用它们打印 0 到 39 之间的数字,每次打印后都有延迟。

 import multiprocessing
import time

def job(num):
  print num
  time.sleep(1)

pool = multiprocessing.Pool(4)

lst = range(40)
for i in lst:
  pool.apply_async(job, [i])

pool.close()
pool.join()

但是,当我尝试使用 multiprocessing.Lock 来防止多个进程打印到标准输出时,程序会立即退出而没有任何输出。

 import multiprocessing
import time

def job(lock, num):
  lock.acquire()
  print num
  lock.release()
  time.sleep(1)

pool = multiprocessing.Pool(4)
l = multiprocessing.Lock()

lst = range(40)
for i in lst:
  pool.apply_async(job, [l, i])

pool.close()
pool.join()

为什么引入一个 multiprocessing.Lock 会使这段代码不起作用?

更新:它在全局声明锁时起作用(我做了一些非确定性测试来检查锁是否有效),而不是上面将锁作为参数传递的代码(Python 的多处理文档显示锁被传递为参数)。下面的代码有一个全局声明的锁,而不是在上面的代码中作为参数传递。

 import multiprocessing
import time

l = multiprocessing.Lock()

def job(num):
  l.acquire()
  print num
  l.release()
  time.sleep(1)

pool = multiprocessing.Pool(4)

lst = range(40)
for i in lst:
  pool.apply_async(job, [i])

pool.close()
pool.join()

原文由 dannyadam 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 819
2 个回答

如果将 pool.apply_async 更改为 pool.apply ,则会出现此异常:

 Traceback (most recent call last):
  File "p.py", line 15, in <module>
    pool.apply(job, [l, i])
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 244, in apply
    return self.apply_async(func, args, kwds).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 558, in get
    raise self._value
RuntimeError: Lock objects should only be shared between processes through inheritance

pool.apply_async 只是隐藏它。我不想这么说,但使用全局变量可能是您示例的最简单方法。让我们只希望 迅猛龙 不会抓到你。

原文由 matsjoyce 发布,翻译遵循 CC BY-SA 3.0 许可协议

其他答案已经提供了 apply_async 默默失败的答案,除非提供了适当的 error_callback 参数。我仍然发现 OP 的另一点是有效的——官方文档确实显示 multiprocessing.Lock 作为函数参数传递。事实上, 编程指南 中标题为“显式将资源传递给子进程”的小节建议传递 multiprocessing.Lock 对象作为函数参数而不是全局变量。而且,我一直在编写很多代码,其中我将 multiprocessing.Lock 作为参数传递给子进程,并且一切都按预期工作。

那么,是什么给了?

我首先调查了 multiprocessing.Lock 是否可以腌制。在 Python 3、MacOS+CPython 中,尝试 pickle multiprocessing.Lock 产生其他人遇到的熟悉的 RuntimeError

 >>> pickle.dumps(multiprocessing.Lock())
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-7-66dfe1355652> in <module>
----> 1 pickle.dumps(multiprocessing.Lock())

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/synchronize.py in __getstate__(self)
     99
    100     def __getstate__(self):
--> 101         context.assert_spawning(self)
    102         sl = self._semlock
    103         if sys.platform == 'win32':

/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py in assert_spawning(obj)
    354         raise RuntimeError(
    355             '%s objects should only be shared between processes'
--> 356             ' through inheritance' % type(obj).__name__
    357             )

RuntimeError: Lock objects should only be shared between processes through inheritance

对我来说,这证实了 multiprocessing.Lock 确实不可腌制。

一边开始

但是, 同一个 锁仍然需要在两个或多个 python 进程之间共享,它们将拥有自己的、可能不同的地址空间(例如当我们使用“spawn”或“forkserver”作为启动方法时)。 multiprocessing 必须做一些特殊的事情来跨进程发送锁。 This other StackOverflow post seems to indicate that in Unix systems, multiprocessing.Lock 可以通过操作系统本身(python 之外)支持的命名信号量来实现。然后,两个或多个 python 进程可以链接到 同一个 锁,该锁实际上驻留在两个 python 进程之外的一个位置。也可能有一个共享内存实现。

一边结束

我们可以传递 multiprocessing.Lock 对象作为参数吗?

经过更多的实验和更多的阅读,似乎区别在于 multiprocessing.Poolmultiprocessing.Process 之间。

multiprocessing.Process 让你通过 multiprocessing.Lock 作为参数,但 multiprocessing.Pool 没有。这是一个有效的示例:

 import multiprocessing
import time
from multiprocessing import Process, Lock

def task(n: int, lock):
    with lock:
        print(f'n={n}')
    time.sleep(0.25)

if __name__ == '__main__':
    multiprocessing.set_start_method('forkserver')
    lock = Lock()
    processes = [Process(target=task, args=(i, lock)) for i in range(20)]
    for process in processes:
        process.start()
    for process in processes:
        process.join()

请注意,使用 __name__ == '__main__' 是必不可少的,如 编程指南 的“安全导入主模块”子部分所述。

multiprocessing.Pool 似乎使用 queue.SimpleQueue 将每个任务放入队列中,这就是酸洗发生的地方。最有可能的是, multiprocessing.Process 没有使用酸洗(或进行特殊版本的酸洗)。

原文由 Ankur 发布,翻译遵循 CC BY-SA 4.0 许可协议

推荐问题