使用 Python 多处理的高内存使用率

新手上路,请多包涵

我看过一些关于使用 Python 多处理模块的内存使用的帖子。然而,这些问题似乎并没有回答我在这里遇到的问题。我正在发布我的分析,希望有人能帮助我。

问题

我正在使用 multiprocessing 并行执行任务,我注意到工作进程的内存消耗无限增长。我有一个小的独立示例,应该复制我注意到的内容。

 import multiprocessing as mp
import time

def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s

if __name__ == "__main__":
    pool = mp.Pool(processes=2)
    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)

系统

我正在运行 Windows,我使用任务管理器来监视内存使用情况。我正在运行 Python 2.7.6。

观察

我总结了下面 2 个工作进程的内存消耗。

 +---------------+----------------------+----------------------+
|  num_tasks    |  memory with del     | memory without del   |
|               | proc_1   | proc_2    | proc_1   | proc_2    |
+---------------+----------------------+----------------------+
| 1000          | 4884     | 4694      | 4892     | 4952      |
| 5000          | 5588     | 5596      | 6140     | 6268      |
| 10000         | 6528     | 6580      | 6640     | 6644      |
+---------------+----------------------+----------------------+

在上表中,我尝试更改任务数并观察在所有计算结束时和 join 之前消耗的内存 pool 。 ‘del’ 和 ‘without del’ 选项分别是我是取消注释还是注释 calculate(num) 函数中的 del l 行。计算前,内存消耗在4400左右。

  1. 看起来手动清除列表会导致工作进程的内存使用率降低。我认为垃圾收集器会处理这个问题。有没有办法强制垃圾收集?
  2. 令人费解的是,随着任务数量的增加,内存使用量在这两种情况下都在不断增长。有没有办法限制内存使用?

我有一个基于此示例的流程,旨在长期运行。我观察到这个工作进程在一夜之间运行后占用了大量内存(~4GB)。做一个 join 释放内存不是一个选项,我试图找出一个没有 join 的方法。

这似乎有点神秘。有没有人遇到过类似的事情?我该如何解决这个问题?

原文由 Goutham 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 533
2 个回答

我做了很多研究,但找不到解决问题本身的解决方案。但是有一个不错的解决方法可以以很小的成本防止内存井喷,特别是在服务器端长时间运行的代码上。

解决方案本质上是在完成固定数量的任务后重新启动各个工作进程。 python 中的 Pool 类将 maxtasksperchild 作为参数。您可以指定 maxtasksperchild=1000 从而限制在每个子进程上运行 1000 个任务。达到 maxtasksperchild 编号后,池刷新其子进程。使用一个谨慎的最大任务数,可以平衡消耗的最大内存,以及与重新启动后端进程相关的启动成本。 Pool 构造完成如下:

 pool = mp.Pool(processes=2,maxtasksperchild=1000)

我将我的完整解决方案放在这里,以便其他人可以使用它!

 import multiprocessing as mp
import time

def calculate(num):
    l = [num*num for num in range(num)]
    s = sum(l)
    del l       # delete lists as an  option
    return s

if __name__ == "__main__":

    # fix is in the following line #
    pool = mp.Pool(processes=2,maxtasksperchild=1000)

    time.sleep(5)
    print "launching calculation"
    num_tasks = 1000
    tasks =  [pool.apply_async(calculate,(i,)) for i in range(num_tasks)]
    for f in tasks:
        print f.get(5)
    print "calculation finished"
    time.sleep(10)
    print "closing  pool"
    pool.close()
    print "closed pool"
    print "joining pool"
    pool.join()
    print "joined pool"
    time.sleep(5)

原文由 Goutham 发布,翻译遵循 CC BY-SA 3.0 许可协议

这里的一个潜在问题是结果可能以任何顺序返回,但由于您是按顺序读取它们,因此它必须将所有从进程返回的结果存储在内存中。 _numtasks 越高,它可能必须存储在内存中的结果越多,等待您的 for f in tasks 循环来处理结果。

在最坏的情况下,结果会以完全相反的顺序计算。在这种情况下,在您的 for f in tasks 循环开始处理任何内容之前,所有结果都必须由 multiprocessing 模块保存在内存中。

在这种情况下,他们使用的内存量似乎确实比我预期的要高(不仅仅是为了存储 calculate() 函数返回的 1000-10000 个数字),但也许只是存储的每个工作人员结果的高恒定开销。

您是否尝试过将 回调 参数指定为 _applyasync ,以便您可以在结果完成后立即处理,或者使用 _imapunordered ,以便它可以在结果准备好后立即返回给您?

原文由 Mike 发布,翻译遵循 CC BY-SA 4.0 许可协议

推荐问题