如何在 Python 中使用多处理队列?

新手上路,请多包涵

我在尝试理解多处理队列如何在 python 上工作以及如何实现它时遇到了很多麻烦。假设我有两个从共享文件访问数据的 python 模块,我们称这两个模块为编写器和读取器。我的计划是让读取器和写入器都将请求放入两个单独的多处理队列中,然后让第三个进程循环弹出这些请求并按此执行。

我的主要问题是我真的不知道如何正确实现 multiprocessing.queue,你不能真正为每个进程实例化对象,因为它们将是单独的队列,你如何确保所有进程都与共享队列相关(或在这种情况下,队列)

原文由 jab 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 347
2 个回答

我的主要问题是我真的不知道如何正确实现 multiprocessing.queue,你不能真正为每个进程实例化对象,因为它们将是单独的队列,你如何确保所有进程都与共享队列相关(或在这种情况下,队列)

这是一个读取器和写入器共享单个队列的简单示例……写入器向读取器发送一串整数;当写入器用完数字时,它会发送“完成”,让读取器知道要跳出读取循环。

您可以根据需要生成任意数量的阅读器进程…

 from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    """Read from the queue; this spawns as a separate Process"""
    while True:
        msg = queue.get()  # Read from the queue and do nothing
        if msg == "DONE":
            break

def writer(count, num_of_reader_procs, queue):
    """Write integers into the queue.  A reader_proc() will read them from the queue"""
    for ii in range(0, count):
        queue.put(ii)  # Put 'count' numbers into queue

    ### Tell all readers to stop...
    for ii in range(0, num_of_reader_procs):
        queue.put("DONE")

def start_reader_procs(qq, num_of_reader_procs):
    """Start the reader processes and return all in a list to the caller"""
    all_reader_procs = list()
    for ii in range(0, num_of_reader_procs):
        ### reader_p() reads from qq as a separate process...
        ###    you can spawn as many reader_p() as you like
        ###    however, there is usually a point of diminishing returns
        reader_p = Process(target=reader_proc, args=((qq),))
        reader_p.daemon = True
        reader_p.start()  # Launch reader_p() as another proc

        all_reader_procs.append(reader_p)

    return all_reader_procs

if __name__ == "__main__":
    num_of_reader_procs = 2
    qq = Queue()  # writer() writes to qq from _this_ process
    for count in [10**4, 10**5, 10**6]:
        assert 0 < num_of_reader_procs < 4
        all_reader_procs = start_reader_procs(qq, num_of_reader_procs)

        writer(count, len(all_reader_procs), qq)  # Queue stuff to all reader_p()
        print("All reader processes are pulling numbers from the queue...")

        _start = time.time()
        for idx, a_reader_proc in enumerate(all_reader_procs):
            print("    Waiting for reader_p.join() index %s" % idx)
            a_reader_proc.join()  # Wait for a_reader_proc() to finish

            print("        reader_p() idx:%s is done" % idx)

        print(
            "Sending {0} integers through Queue() took {1} seconds".format(
                count, (time.time() - _start)
            )
        )
        print("")

原文由 Mike Pennington 发布,翻译遵循 CC BY-SA 4.0 许可协议

这是 multiprocessing.Queuemultiprocessing.Process 的一个非常简单的用法,它允许调用者将“事件”加上参数发送到一个单独的进程,该进程将事件分派给进程上的“do_”方法。 (蟒蛇 3.4+)

 import multiprocessing as mp
import collections

Msg = collections.namedtuple('Msg', ['event', 'args'])

class BaseProcess(mp.Process):
    """A process backed by an internal queue for simple one-way message passing.
    """
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.queue = mp.Queue()

    def send(self, event, *args):
        """Puts the event and args as a `Msg` on the queue
        """
       msg = Msg(event, args)
       self.queue.put(msg)

    def dispatch(self, msg):
        event, args = msg

        handler = getattr(self, "do_%s" % event, None)
        if not handler:
            raise NotImplementedError("Process has no handler for [%s]" % event)

        handler(*args)

    def run(self):
        while True:
            msg = self.queue.get()
            self.dispatch(msg)

用法:

 class MyProcess(BaseProcess):
    def do_helloworld(self, arg1, arg2):
        print(arg1, arg2)

if __name__ == "__main__":
    process = MyProcess()
    process.start()
    process.send('helloworld', 'hello', 'world')

send 发生在父进程中, do_* 发生在子进程中。

我省略了任何明显会中断运行循环并退出子进程的异常处理。您还可以通过覆盖 run 来自定义它以控制阻塞或其他任何东西。

这实际上仅在您只有一个工作进程的情况下才有用,但我认为这是对这个问题的相关回答,以演示具有更多面向对象的常见场景。

原文由 Joe Holloway 发布,翻译遵循 CC BY-SA 4.0 许可协议

推荐问题