multiprocessing.Queue 的管道错误

新手上路,请多包涵

在 python2.7 中,从函数内部初始化时,multiprocessing.Queue 会抛出损坏的错误。我提供了一个重现问题的最小示例。

 #!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing

def main():
    q = multiprocessing.Queue()
    for i in range(10):
        q.put(i)

if __name__ == "__main__":
    main()

抛出以下破管错误

Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/queues.py", line 268, in _feed
send(obj)
IOError: [Errno 32] Broken pipe

Process finished with exit code 0

我无法破译为什么。如果我们不能从函数内部填充 Queue 对象,那肯定会很奇怪。

原文由 hAcKnRoCk 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1k
2 个回答

编辑:请使用 @Peter Svac 的 回答,这样更好。使用 join_thread 确保队列以比我建议的 time.sleep(0.1) 更好的方式完成工作。

这里发生的是,当您调用 main() 时,它会创建 Queue ,将 10 个对象放入其中并结束函数,垃圾收集其所有内部变量和对象,包括 Queue 。但是您收到此错误是因为您仍在尝试发送 Queue 中的最后一个数字。

来自文档 文档

“当进程首先将一个项目放入队列时,将启动一个供给线程,它将对象从缓冲区传输到管道中。”

由于 put() 是在另一个线程中创建的,它不会阻止脚本的执行,并允许在完成队列操作之前结束 main() 函数。

尝试这个 :

 #!/usr/bin/python
# -*- coding: utf-8 -*-

import multiprocessing
import time
def main():
    q = multiprocessing.Queue()
    for i in range(10):
        print i
        q.put(i)
    time.sleep(0.1) # Just enough to let the Queue finish

if __name__ == "__main__":
    main()

应该有一种方法 join 队列或块执行,直到对象被放入 Queue ,你应该看看文档。

原文由 CoMartel 发布,翻译遵循 CC BY-SA 4.0 许可协议

当您启动 Queue.put() 时,将启动隐式线程以将数据传送到队列。同时,主应用程序完成,数据没有终点站(队列对象被垃圾收集)。

我会试试这个:

 from multiprocessing import Queue

def main():
    q = Queue()
    for i in range(10):
        print i
        q.put(i)
    q.close()
    q.join_thread()

if __name__ == "__main__":
    main()

join_thread() 确保缓冲区中的所有数据都已被刷新。 close() 必须在 join_thread()

原文由 Peter Svac 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题