如何解决使用 Pool.map() 进行多处理时的内存问题?

新手上路,请多包涵

我已经将程序(如下)编写为:

  • 读取一个巨大的文本文件 pandas dataframe
  • 然后 groupby 使用特定的列值拆分数据并存储为数据帧列表。
  • 然后将数据通过管道传输到 multiprocess Pool.map() 以并行处理每个数据帧。

一切都很好,该程序在我的小型测试数据集上运行良好。但是,当我输入大数据(大约 14 GB)时,内存消耗呈指数增长,然后冻结计算机或被杀死(在 HPC 集群中)。

一旦数据/变量没有用,我已经添加了代码来清除内存。我也会在完成后立即关闭游泳池。仍然有 14 GB 输入,我只期望 2*14 GB 内存负担,但似乎很多事情都在发生。我还尝试使用 chunkSize and maxTaskPerChild, etc 进行调整,但我没有看到测试与大文件的优化有任何差异。

我认为当我开始 multiprocessing 时,此代码位置需要对此代码进行改进。

p = Pool(3) # number of pool to run at once; default at 1 result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values())) 但是,我发布了整个代码。

测试示例: 我创建了一个高达 250 MB 的测试文件(“genome_matrix_final-chr1234-1mb.txt”)并运行了程序。当我检查系统监视器时,我可以看到内存消耗增加了大约 6 GB。我不太清楚为什么 250 mb 文件加上一些输出占用了这么多内存空间。如果它有助于查看真正的问题,我已经通过投递箱共享了该文件。 https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0

有人可以建议,我怎样才能摆脱这个问题?

我的蟒蛇脚本:

 #!/home/bin/python3

import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource

print()
print('Checking required modules')
print()

''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt"   # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt"  # test file 02
#genome_matrix_file = "genome_matrix_final.txt"    # large file

def main():
    with open("genome_matrix_header.txt") as header:
        header = header.read().rstrip('\n').split('\t')
        print()

    time01 = time.time()
    print('starting time: ', time01)

    '''load the genome matrix file onto pandas as dataframe.
    This makes is more easy for multiprocessing'''
    gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)

    # now, group the dataframe by chromosome/contig - so it can be multiprocessed
    gen_matrix_df = gen_matrix_df.groupby('CHROM')

    # store the splitted dataframes as list of key, values(pandas dataframe) pairs
    # this list of dataframe will be used while multiprocessing
    gen_matrix_df_list = collections.OrderedDict()
    for chr_, data in gen_matrix_df:
        gen_matrix_df_list[chr_] = data

    # clear memory
    del gen_matrix_df

    '''Now, pipe each dataframe from the list using map.Pool() '''
    p = Pool(3)  # number of pool to run at once; default at 1
    result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))

    del gen_matrix_df_list  # clear memory

    p.close()
    p.join()

    # concat the results from pool.map() and write it to a file
    result_merged = pd.concat(result)
    del result  # clear memory

    pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)

    print()
    print('completed all process in "%s" sec. ' % (time.time() - time01))
    print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
    print()

'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):

    print()
    time02 = time.time()

    # index position of the samples in genome matrix file
    sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
                    {'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
                    {'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
                    {'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
                    {'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
                    {'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
                    {'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
                    {'8a': 32, '8b': 17}]

    # sample index stored as ordered dictionary
    sample_idx_ord_list = []
    for ids in sample_idx:
        ids = collections.OrderedDict(sorted(ids.items()))
        sample_idx_ord_list.append(ids)

    # for haplotype file
    header = ['contig', 'pos', 'ref', 'alt']

    # adding some suffixes "PI" to available sample names
    for item in sample_idx_ord_list:
        ks_update = ''
        for ks in item.keys():
            ks_update += ks
        header.append(ks_update+'_PI')
        header.append(ks_update+'_PG_al')

    #final variable store the haplotype data
    # write the header lines first
    haplotype_output = '\t'.join(header) + '\n'

    # to store the value of parsed the line and update the "PI", "PG" value for each sample
    updated_line = ''

    # read the piped in data back to text like file
    matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)

    matrix_df = matrix_df.rstrip('\n').split('\n')
    for line in matrix_df:
        if line.startswith('CHROM'):
            continue

        line_split = line.split('\t')
        chr_ = line_split[0]
        ref = line_split[2]
        alt = list(set(line_split[3:]))

        # remove the alleles "N" missing and "ref" from the alt-alleles
        alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))

        # if no alt alleles are found, just continue
        # - i.e : don't write that line in output file
        if len(alt_up) == 0:
            continue

        #print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
        #so, we have data for CHR, POS, REF, ALT so far
        # now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
        sample_data_for_vcf = []
        for ids in sample_idx_ord_list:
            sample_data = []
            for key, val in ids.items():
                sample_value = line_split[val]
                sample_data.append(sample_value)

            # now, update the phased state for each sample
            # also replacing the missing allele i.e "N" and "-" with ref-allele
            sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
            sample_data_for_vcf.append(str(chr_))
            sample_data_for_vcf.append(sample_data)

        # add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
        # and .. write it to final haplotype file
        sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
        updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
            '\t' + sample_data_for_vcf + '\n'
        haplotype_output += updated_line

    del matrix_df  # clear memory
    print('completed haplotype preparation for chromosome/contig "%s" '
          'in "%s" sec. ' %(chr_, time.time()-time02))
    print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))

    # return the data back to the pool
    return pd.read_csv(io.StringIO(haplotype_output), sep='\t')

''' to monitor memory '''
def current_mem_usage():
    return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.

if __name__ == '__main__':
    main()

赏金猎人更新:

我已经使用 Pool.map() 实现了多处理,但是代码造成了很大的内存负担(输入测试文件 ~ 300 MB,但内存负担约为 6 GB)。我只期望最大 3*300 MB 的内存负担。

  • 有人可以解释一下,是什么导致如此小的文件和如此小的长度计算需要如此巨大的内存。
  • 此外,我正在尝试获取答案并使用它来改进我的大型程序中的多进程。因此,添加任何不会过多改变计算部分(CPU 绑定进程)结构的方法、模块应该没问题。
  • 为了测试目的,我已经包含了两个测试文件来使用代码。
  • 附加代码是完整代码,因此它应该像复制粘贴时一样按预期工作。任何更改都应该仅用于改进多处理步骤中的优化。

原文由 everestial007 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.3k
2 个回答

先决条件

  1. 在 Python 中(以下我使用 64 位版本的 Python 3.6.5),一切皆对象。这有它的开销和 getsizeof 我们可以准确地看到对象的大小(以字节为单位):
    >>> import sys
   >>> sys.getsizeof(42)
   28
   >>> sys.getsizeof('T')
   50

  1. 当使用 fork 系统调用(*nix 上的默认设置,请参阅 multiprocessing.get_start_method() )创建子进程时,不会复制父进程的物理内存,而是使用 写时复制 技术。
  2. Fork 子进程仍将报告父进程的完整 RSS(驻留集大小)。由于这个事实, PSS (比例集大小)是更合适的度量来估计分叉应用程序的内存使用情况。这是页面中的示例:
  • 进程 A 有 50 KiB 的非共享内存
  • 进程 B 有 300 KiB 的非共享内存
  • 进程 A 和进程 B 都有 100 KiB 的相同共享内存区域

由于PSS被定义为一个进程的非共享内存和与其他进程共享的内存比例之和,因此这两个进程的PSS如下:

  • 进程 A 的 PSS = 50 KiB + (100 KiB / 2) = 100 KiB
  • 进程 B 的 PSS = 300 KiB + (100 KiB / 2) = 350 KiB

数据框

不要让我们单独看看你的 DataFramememory_profiler 会帮助我们。

justpd.py

 #!/usr/bin/env python3

import pandas as pd
from memory_profiler import profile

@profile
def main():
    with open('genome_matrix_header.txt') as header:
        header = header.read().rstrip('\n').split('\t')

    gen_matrix_df = pd.read_csv(
        'genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)

    gen_matrix_df.info()
    gen_matrix_df.info(memory_usage='deep')

if __name__ == '__main__':
    main()

现在让我们使用探查器:

 mprof run justpd.py
mprof plot

我们可以看到剧情:

内存配置文件

和逐行跟踪:

 Line #    Mem usage    Increment   Line Contents
================================================
     6     54.3 MiB     54.3 MiB   @profile
     7                             def main():
     8     54.3 MiB      0.0 MiB       with open('genome_matrix_header.txt') as header:
     9     54.3 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    10
    11   2072.0 MiB   2017.7 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    12
    13   2072.0 MiB      0.0 MiB       gen_matrix_df.info()
    14   2072.0 MiB      0.0 MiB       gen_matrix_df.info(memory_usage='deep')

我们可以看到数据帧在构建时占用了 ~2 GiB,峰值为 ~3 GiB。更有趣的是 info 的输出。

 <class 'pandas.core.frame.DataFrame'>
RangeIndex: 4000000 entries, 0 to 3999999
Data columns (total 34 columns):
...
dtypes: int64(2), object(32)
memory usage: 1.0+ GB

但是 info(memory_usage='deep') (“深度”意味着通过询问 object dtype 深入地反省数据)给出:见下文

memory usage: 7.9 GB

啊?!从流程外部看,我们可以确保 memory_profiler 的数字是正确的。 sys.getsizeof 也为帧显示相同的值(很可能是因为自定义 __sizeof__ ),其他使用它来估计分配的工具也是如此 gc.get_objects() -f9a5 pympler

 # added after read_csv
from pympler import tracker
tr = tracker.SummaryTracker()
tr.print_diff()

给出:

                                              types |   # objects |   total size
================================================== | =========== | ============
                 <class 'pandas.core.series.Series |          34 |      7.93 GB
                                      <class 'list |        7839 |    732.38 KB
                                       <class 'str |        7741 |    550.10 KB
                                       <class 'int |        1810 |     49.66 KB
                                      <class 'dict |          38 |      7.43 KB
  <class 'pandas.core.internals.SingleBlockManager |          34 |      3.98 KB
                             <class 'numpy.ndarray |          34 |      3.19 KB

那么这些 7.93 GiB 从何而来?让我们试着解释一下。我们有 4M 行和 34 列,这给了我们 134M 个值。它们是 int64object (这是一个 64 位指针;有关详细说明,请参阅 using pandas with large data )。因此我们有 134 * 10 ** 6 * 8 / 2 ** 20 ~1022 MiB 仅用于数据框中的值。剩下的 ~ 6.93 GiB 呢?

字符串实习

要理解这种行为,有必要知道 Python 会进行字符串驻留。有两篇关于 Python 2 中的字符串实习的好文章( 一篇两篇)。除了 Python 3 中的 Unicode 更改和 Python 3.3 中的 PEP 393 之外,C 结构也发生了变化,但思想是相同的。基本上,每个看起来像标识符的短字符串都将被 Python 缓存在内部字典中,并且引用将指向相同的 Python 对象。换句话说,我们可以说它的行为像一个单例。我上面提到的文章解释了它提供的显着内存配置文件和性能改进。我们可以使用 interned 字段 PyASCIIObject 来检查字符串是否被驻留:

 import ctypes

class PyASCIIObject(ctypes.Structure):
     _fields_ = [
         ('ob_refcnt', ctypes.c_size_t),
         ('ob_type', ctypes.py_object),
         ('length', ctypes.c_ssize_t),
         ('hash', ctypes.c_int64),
         ('state', ctypes.c_int32),
         ('wstr', ctypes.c_wchar_p)
    ]

然后:

 >>> a = 'name'
>>> b = '!@#$'
>>> a_struct = PyASCIIObject.from_address(id(a))
>>> a_struct.state & 0b11
1
>>> b_struct = PyASCIIObject.from_address(id(b))
>>> b_struct.state & 0b11
0

对于两个字符串,我们还可以进行身份比较(在 CPython 的情况下在内存比较中解决)。

 >>> a = 'foo'
>>> b = 'foo'
>>> a is b
True
>> gen_matrix_df.REF[0] is gen_matrix_df.REF[6]
True

因此,关于 object dtype ,数据框最多分配 20 个字符串(每个氨基酸一个)。不过,值得注意的是 Pandas 推荐枚举的 分类类型

熊猫记忆

因此,我们可以像这样解释 7.93 GiB 的天真估计:

 >>> rows = 4 * 10 ** 6
>>> int_cols = 2
>>> str_cols = 32
>>> int_size = 8
>>> str_size = 58
>>> ptr_size = 8
>>> (int_cols * int_size + str_cols * (str_size + ptr_size)) * rows / 2 ** 30
7.927417755126953

请注意, str_size 是 58 个字节,而不是我们在上面看到的 1 个字符文字的 50 个字节。这是因为 PEP 393 定义了紧凑型和非紧凑型字符串。您可以使用 sys.getsizeof(gen_matrix_df.REF[0]) 检查它。

正如 gen_matrix_df.info() 所报告的那样,实际内存消耗应该是 ~1 GiB,它是原来的两倍。我们可以假设它与 Pandas 或 NumPy 完成的内存(预)分配有关。下面的实验说明不无道理(多次运行显示保存图片):

 Line #    Mem usage    Increment   Line Contents
================================================
     8     53.1 MiB     53.1 MiB   @profile
     9                             def main():
    10     53.1 MiB      0.0 MiB       with open("genome_matrix_header.txt") as header:
    11     53.1 MiB      0.0 MiB           header = header.read().rstrip('\n').split('\t')
    12
    13   2070.9 MiB   2017.8 MiB       gen_matrix_df = pd.read_csv('genome_matrix_final-chr1234-1mb.txt', sep='\t', names=header)
    14   2071.2 MiB      0.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    15   2071.2 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[gen_matrix_df.keys()[0]])
    16   2040.7 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    23   1827.1 MiB    -30.5 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    24   1094.7 MiB   -732.4 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    25   1765.9 MiB    671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    26   1094.7 MiB   -671.3 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    27   1704.8 MiB    610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    28   1094.7 MiB   -610.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    29   1643.9 MiB    549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    30   1094.7 MiB   -549.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    31   1582.8 MiB    488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    32   1094.7 MiB   -488.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    33   1521.9 MiB    427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    34   1094.7 MiB   -427.2 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    35   1460.8 MiB    366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    36   1094.7 MiB   -366.1 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    37   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])
    ...
    47   1094.7 MiB      0.0 MiB       gen_matrix_df = gen_matrix_df.drop(columns=[random.choice(gen_matrix_df.keys())])

我想引用 Pandas 原作者 关于设计问题和未来 Pandas2 的最新文章中 的一句话来结束本节。

pandas 的经验法则:内存是数据集大小的 5 到 10 倍

进程树

最后让我们来看看池,看看是否可以使用写时复制。我们将使用 smemstat (可从 Ubuntu 存储库获得)估计进程组内存共享和 glances 记下系统范围的可用内存。两者都可以写JSON。

我们将使用 Pool(2) 运行原始脚本。我们需要 3 个终端窗口。

  1. smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
  2. glances -t 1 --export-json glances.json
  3. mprof run -M script.py

然后 mprof plot 产生:

3道工序

总和图 ( mprof run --nopython --include-children ./script.py ) 如下所示:

在此处输入图像描述

请注意,上面的两个图表显示的是 RSS。假设是因为写时复制,它并不反映实际的内存使用情况。现在我们有两个来自 smemstatglances 的 JSON 文件。我将使用以下脚本将 JSON 文件转换为 CSV。

 #!/usr/bin/env python3

import csv
import sys
import json

def smemstat():
  with open('smemstat.json') as f:
    smem = json.load(f)

  rows = []
  fieldnames = set()
  for s in smem['smemstat']['periodic-samples']:
    row = {}
    for ps in s['smem-per-process']:
      if 'script.py' in ps['command']:
        for k in ('uss', 'pss', 'rss'):
          row['{}-{}'.format(ps['pid'], k)] = ps[k] // 2 ** 20

    # smemstat produces empty samples, backfill from previous
    if rows:
      for k, v in rows[-1].items():
        row.setdefault(k, v)

    rows.append(row)
    fieldnames.update(row.keys())

  with open('smemstat.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=sorted(fieldnames))
    dw.writeheader()
    list(map(dw.writerow, rows))

def glances():
  rows = []
  fieldnames = ['available', 'used', 'cached', 'mem_careful', 'percent',
    'free', 'mem_critical', 'inactive', 'shared', 'history_size',
    'mem_warning', 'total', 'active', 'buffers']
  with open('glances.csv', 'w') as out:
    dw = csv.DictWriter(out, fieldnames=fieldnames)
    dw.writeheader()
    with open('glances.json') as f:
      for l in f:
        d = json.loads(l)
        dw.writerow(d['mem'])

if __name__ == '__main__':
  globals()[sys.argv[1]]()

首先让我们看看 free 内存。

在此处输入图像描述

第一个和最小值之间的差异约为 4.15 GiB。这是 PSS 数据的样子:

在此处输入图像描述

总和:

在此处输入图像描述

因此我们可以看到,由于写时复制,实际内存消耗约为 4.15 GiB。但是我们仍在序列化数据以通过 Pool.map 将其发送到工作进程。我们也可以在这里利用写时复制吗?

共享数据

要使用写时复制,我们需要让 list(gen_matrix_df_list.values()) 可以全局访问,以便 fork 之后的工作人员仍然可以读取它。

  1. 让我们在 del gen_matrix_df 中修改 main 之后的代码,如下所示:
    ...
   global global_gen_matrix_df_values
   global_gen_matrix_df_values = list(gen_matrix_df_list.values())
   del gen_matrix_df_list

   p = Pool(2)
   result = p.map(matrix_to_vcf, range(len(global_gen_matrix_df_values)))
   ...

  1. 删除 del gen_matrix_df_list 稍后。
  2. 并修改 matrix_to_vcf 的第一行,如:
    def matrix_to_vcf(i):
       matrix_df = global_gen_matrix_df_values[i]

现在让我们重新运行它。空闲内存:

自由

进程树:

进程树

它的总和:

和

因此,我们的实际内存使用量最多为 ~2.9 GiB(峰值主进程在构建数据帧时有),写时复制有帮助!

作为旁注,有所谓的读时复制,即 Python 的引用循环垃圾收集器的行为, 在 Instagram Engineering 中有所描述(导致 gc.freezeissue31558 中)。但是 gc.disable() 在这种特殊情况下没有影响。

更新

写时复制无副本数据共享的替代方法是从一开始就使用 numpy.memmap 将其委托给内核。这是 Python 谈话中高性能数据处理 的示例实现棘手的部分 是让 Pandas 使用 mmaped Numpy 数组。

原文由 saaj 发布,翻译遵循 CC BY-SA 4.0 许可协议

当您使用 multiprocessing.Pool 时,将使用 fork() 系统调用创建许多子进程。这些进程中的每一个都以当时父进程内存的精确副本开始。因为您在创建大小为 3 的 Pool 之前加载了 csv,所以池中的这 3 个进程中的每一个都将不必要地拥有数据帧的副本。 ( gen_matrix_df 以及 gen_matrix_df_list 将存在于当前进程以及 3 个子进程中的每一个中,因此这些结构中的每一个都将存在内存中的 4 个副本)

尝试在加载文件之前创建 Pool (实际上是在最开始)这应该会减少内存使用。

如果它仍然太高,您可以:

  1. 将 gen_matrix_df_list 转储到文件中,每行 1 项,例如:
    import os
   import cPickle

   with open('tempfile.txt', 'w') as f:
       for item in gen_matrix_df_list.items():
           cPickle.dump(item, f)
           f.write(os.linesep)

  1. 在迭代器上使用 Pool.imap() 在此文件中转储的行,例如:
    with open('tempfile.txt', 'r') as f:
       p.imap(matrix_to_vcf, (cPickle.loads(line) for line in f))

(注意 `matrix_to_vcf` \-\-\- 在上面的例子中取一个 `(key, value)` 元组,而不仅仅是一个值)

我希望这有帮助。

注意:我没有测试上面的代码。这只是为了证明这个想法。

原文由 tomas 发布,翻译遵循 CC BY-SA 3.0 许可协议

推荐问题