python multiprocess pipe 报错“管道已关闭”?

问题描述

通过python mulitproccess 的 Pipe 方法,建立父子进程通信。发现一开始运行程序就会报
"管道已关闭"错误

问题出现的环境背景及自己尝试过哪些方法

大概知道跟管道通信和进程join方法的调用有关系,尝试过只要 single.py里追加server.stop() 就不会报错

相关代码

service.py

import os
from multiprocessing import Process, Pipe


def start_child_process(child_conn):
    # here is a task will work for a long time
    # start_http_server_run_for_long_time()
    # now tell you which port listened
    child_conn.send({"port": 123456, "ret": 1, "pid": os.getpid()})
    # exit child process only after receive main process signal/message
    signal = child_conn.recv()
    if signal:
        child_conn.close()


class Server:
    def __init__(self):
        self.child_conn = None
        self.child = None
        self.parent_conn, self.child_conn = Pipe()

    def run(self):
        self.child = Process(target=start_child_process, name="my_child_process", args=(self.child_conn,))
        self.child.start()

        data = self.parent_conn.recv()
        result = {
            "endpoints": {
                "http": f"http://127.0.0.1:{data['port']}/cmd",
                "ws": f"ws://127.0.0.1:{data['port']}/api",
            }
        }
        return result

    """ stop方法是个函数接口,外部函数会调用来终止子进程 """
    def stop(self):
        self.parent_conn.send(True)
        self.child.join()
        self.child = None


if __name__ == "__main__":
    server = Server()
    r = server.run()
    print("r:", r)

single.py

from service import Server
import time


def main():
    server = Server()
    result = server.run()
    print("r:", result)
    # time.sleep(5)
    # server.stop() # 只要打开这行代码的注释,问题就会消失


if __name__ == "__main__":
    main()

运行python single.py就会报错
image.png

但是只要将 single.py 里 time.sleep(5) 和 server.stop() 这两行注释打开,就会运行正常。

你期待的结果是什么?实际看到的错误信息又是什么?

希望子进程长期存活(因为子进程内会可能会启动一个web server),除非接收到主进程的退出通知。如显式调用server.stop()则退出子进程

阅读 3.4k
2 个回答

仔细想了下,对比了下官方文档
image.png
image.png
大概理解了报错的原因点:
因为子进程在在执行到 signal = child_conn.recv() 会进入到进程阻塞状态,等待主进程发送信息,然而,直到主进程栈空退出,对端关闭连接,子进程依然没有收到退出通知,自动触发了一次通道内数据的全量读取,然而此刻通道已经关闭,所以报出了最初的 "通道已关闭错误"

那么解决办法也好办了,只要对 signal = child_conn.recv() 加一个error catch, 无视错误就好,因为确实有可能主进程不会通知子进程退出就退出主进程自己的进程,此种情况也算合理。不知道有没有更科学的实现方法。修改好代码如下:

service.py

import os
from multiprocessing import Process, Pipe


def start_child_process(child_conn):
    # here is a task will work for a long time
    # run_server_for_long_time()
    child_conn.send({"port": 123, "ret": 1, "pid": os.getpid()})
    # exit child process only after receive main process signal/message

    # catch IOError when main process automatically exit
    try:
        signal = child_conn.recv()
        if signal:
            child_conn.close()
    except EOFError as err:
        print(err)


class Server:
    def __init__(self):
        self.child_conn = None
        self.child = None
        self.parent_conn, self.child_conn = Pipe()

    def run(self):
        self.child = Process(target=start_child_process, name="my_child_process", args=(self.child_conn,))
        self.child.start()

        data = self.parent_conn.recv()
        result = {
            "endpoints": {
                "http": f"http://127.0.0.1:{data['port']}/cmd",
                "ws": f"ws://127.0.0.1:{data['port']}/api",
            }
        }
        return result

    def stop(self):
        """ call it only when necessary """
        self.parent_conn.send(True)
        self.child.join()
        self.child = None


if __name__ == "__main__":
    server = Server()
    r = server.run()
    print("r:", r)

single.py

from service import Server
import time


def main():
    server = Server()
    result = server.run()
    print("r:", result)
    time.sleep(5)
    # server.stop() # 只要打开这行代码的注释,问题就会消失


if __name__ == "__main__":
    main()

从楼主的问题描述中,我能看到你对于这个问题的思考很棒,楼主首先应该在这部分代码中去掉这个注释。
图片.png
也就是修改这个py为

def main():
    server = Server()
    result = server.run()
    time.sleep(1)  # 增加等待时间
    print("r:", result)

我来解释下为什么需要增加等待时间,这是因为先让程序休眠一段时间是为了确保子进程已经向管道中写入数据后再从管道中读取数据。因为在父子进程通信时,如果父进程尝试从空的管道中读取数据,就会收到一个 EOFError 异常,提示 "管道已关闭"。
当子进程启动后,它会向父进程传递一些信息,例如端口号等。但是这个过程需要一定的时间,并不是立即完成的。因此,如果在子进程启动后立即尝试从管道中读取数据,那么很可能会遇到上述问题。
通过让程序休眠一段时间可以确保子进程已经向管道中写入了数据,并且父进程可以正确地从管道中读取到这些数据。具体的等待时间可以根据实际情况进行调整,在本例中增加了1秒的等待时间。

另外,在 Server 类中的 stop() 方法中也需要增加等待子进程退出的时间,否则会出现子进程没有完全退出就被主进程终止的情况。

class Server:
    def __init__(self):
        self.child_conn = None
        self.child = None
        self.parent_conn, self.child_conn = Pipe()

    def run(self):
        self.child = Process(target=start_child_process, name="my_child_process", args=(self.child_conn,))
        self.child.start()

        data = self.parent_conn.recv()
        result = {
            "endpoints": {
                "http": f"http://127.0.0.1:{data['port']}/cmd",
                "ws": f"ws://127.0.0.1:{data['port']}/api",
            }
        }
        return result

    """ stop方法是个函数接口,外部函数会调用来终止子进程 """
    def stop(self):
        self.parent_conn.send(True)
        self.child.join(timeout=1)  # 增加等待时间
        if self.child.is_alive():
            self.child.terminate()
        self.child = None

也就是说,上述这段代码就是在你原基础上增加了一个timeout,即可以解决这个问题了。
————————————————————————————————————————————————————————————————————————
下划线
回答更新:根据楼主的反馈,依然存在类似问题,怀疑是等待的时间不够长或者其他进程问题,因此建议换成subprocess.Popen() 和 subprocess.terminate() 试试,不知道是否正确(因为目前我在外面,身边没电脑,没办法测试,纯手机打字)

import subprocess
import time

def start_server():
    global server_process
    server_process = subprocess.Popen(['python', 'manage.py', 'runserver'])
    print('Server started...')

def stop_server():
    if server_process and server_process.poll() is None:
        server_process.terminate()
        print('Server stopped.')

if __name__ == '__main__':
    try:
        start_server()
        time.sleep(10)  # 让服务器运行一段时间
    finally:
        stop_server()

不知道能否解决楼主问题

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题