我已经将程序(如下)编写为:
- 读取一个巨大的文本文件
pandas dataframe
- 然后
groupby
使用特定的列值拆分数据并存储为数据帧列表。 - 然后将数据通过管道传输到
multiprocess Pool.map()
以并行处理每个数据帧。
一切都很好,该程序在我的小型测试数据集上运行良好。但是,当我输入大数据(大约 14 GB)时,内存消耗呈指数增长,然后冻结计算机或被杀死(在 HPC 集群中)。
一旦数据/变量没有用,我已经添加了代码来清除内存。我也会在完成后立即关闭游泳池。仍然有 14 GB 输入,我只期望 2*14 GB 内存负担,但似乎很多事情都在发生。我还尝试使用 chunkSize and maxTaskPerChild, etc
进行调整,但我没有看到测试与大文件的优化有任何差异。
我认为当我开始 multiprocessing
时,此代码位置需要对此代码进行改进。
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
但是,我发布了整个代码。
测试示例: 我创建了一个高达 250 MB 的测试文件(“genome_matrix_final-chr1234-1mb.txt”)并运行了程序。当我检查系统监视器时,我可以看到内存消耗增加了大约 6 GB。我不太清楚为什么 250 mb 文件加上一些输出占用了这么多内存空间。如果它有助于查看真正的问题,我已经通过投递箱共享了该文件。 https://www.dropbox.com/sh/coihujii38t5prd/AABDXv8ACGIYczeMtzKBo0eea?dl=0
有人可以建议,我怎样才能摆脱这个问题?
我的蟒蛇脚本:
#!/home/bin/python3
import pandas as pd
import collections
from multiprocessing import Pool
import io
import time
import resource
print()
print('Checking required modules')
print()
''' change this input file name and/or path as need be '''
genome_matrix_file = "genome_matrix_final-chr1n2-2mb.txt" # test file 01
genome_matrix_file = "genome_matrix_final-chr1234-1mb.txt" # test file 02
#genome_matrix_file = "genome_matrix_final.txt" # large file
def main():
with open("genome_matrix_header.txt") as header:
header = header.read().rstrip('\n').split('\t')
print()
time01 = time.time()
print('starting time: ', time01)
'''load the genome matrix file onto pandas as dataframe.
This makes is more easy for multiprocessing'''
gen_matrix_df = pd.read_csv(genome_matrix_file, sep='\t', names=header)
# now, group the dataframe by chromosome/contig - so it can be multiprocessed
gen_matrix_df = gen_matrix_df.groupby('CHROM')
# store the splitted dataframes as list of key, values(pandas dataframe) pairs
# this list of dataframe will be used while multiprocessing
gen_matrix_df_list = collections.OrderedDict()
for chr_, data in gen_matrix_df:
gen_matrix_df_list[chr_] = data
# clear memory
del gen_matrix_df
'''Now, pipe each dataframe from the list using map.Pool() '''
p = Pool(3) # number of pool to run at once; default at 1
result = p.map(matrix_to_vcf, list(gen_matrix_df_list.values()))
del gen_matrix_df_list # clear memory
p.close()
p.join()
# concat the results from pool.map() and write it to a file
result_merged = pd.concat(result)
del result # clear memory
pd.DataFrame.to_csv(result_merged, "matrix_to_haplotype-chr1n2.txt", sep='\t', header=True, index=False)
print()
print('completed all process in "%s" sec. ' % (time.time() - time01))
print('Global maximum memory usage: %.2f (mb)' % current_mem_usage())
print()
'''function to convert the dataframe from genome matrix to desired output '''
def matrix_to_vcf(matrix_df):
print()
time02 = time.time()
# index position of the samples in genome matrix file
sample_idx = [{'10a': 33, '10b': 18}, {'13a': 3, '13b': 19},
{'14a': 20, '14b': 4}, {'16a': 5, '16b': 21},
{'17a': 6, '17b': 22}, {'23a': 7, '23b': 23},
{'24a': 8, '24b': 24}, {'25a': 25, '25b': 9},
{'26a': 10, '26b': 26}, {'34a': 11, '34b': 27},
{'35a': 12, '35b': 28}, {'37a': 13, '37b': 29},
{'38a': 14, '38b': 30}, {'3a': 31, '3b': 15},
{'8a': 32, '8b': 17}]
# sample index stored as ordered dictionary
sample_idx_ord_list = []
for ids in sample_idx:
ids = collections.OrderedDict(sorted(ids.items()))
sample_idx_ord_list.append(ids)
# for haplotype file
header = ['contig', 'pos', 'ref', 'alt']
# adding some suffixes "PI" to available sample names
for item in sample_idx_ord_list:
ks_update = ''
for ks in item.keys():
ks_update += ks
header.append(ks_update+'_PI')
header.append(ks_update+'_PG_al')
#final variable store the haplotype data
# write the header lines first
haplotype_output = '\t'.join(header) + '\n'
# to store the value of parsed the line and update the "PI", "PG" value for each sample
updated_line = ''
# read the piped in data back to text like file
matrix_df = pd.DataFrame.to_csv(matrix_df, sep='\t', index=False)
matrix_df = matrix_df.rstrip('\n').split('\n')
for line in matrix_df:
if line.startswith('CHROM'):
continue
line_split = line.split('\t')
chr_ = line_split[0]
ref = line_split[2]
alt = list(set(line_split[3:]))
# remove the alleles "N" missing and "ref" from the alt-alleles
alt_up = list(filter(lambda x: x!='N' and x!=ref, alt))
# if no alt alleles are found, just continue
# - i.e : don't write that line in output file
if len(alt_up) == 0:
continue
#print('\nMining data for chromosome/contig "%s" ' %(chr_ ))
#so, we have data for CHR, POS, REF, ALT so far
# now, we mine phased genotype for each sample pair (as "PG_al", and also add "PI" tag)
sample_data_for_vcf = []
for ids in sample_idx_ord_list:
sample_data = []
for key, val in ids.items():
sample_value = line_split[val]
sample_data.append(sample_value)
# now, update the phased state for each sample
# also replacing the missing allele i.e "N" and "-" with ref-allele
sample_data = ('|'.join(sample_data)).replace('N', ref).replace('-', ref)
sample_data_for_vcf.append(str(chr_))
sample_data_for_vcf.append(sample_data)
# add data for all the samples in that line, append it with former columns (chrom, pos ..) ..
# and .. write it to final haplotype file
sample_data_for_vcf = '\t'.join(sample_data_for_vcf)
updated_line = '\t'.join(line_split[0:3]) + '\t' + ','.join(alt_up) + \
'\t' + sample_data_for_vcf + '\n'
haplotype_output += updated_line
del matrix_df # clear memory
print('completed haplotype preparation for chromosome/contig "%s" '
'in "%s" sec. ' %(chr_, time.time()-time02))
print('\tWorker maximum memory usage: %.2f (mb)' %(current_mem_usage()))
# return the data back to the pool
return pd.read_csv(io.StringIO(haplotype_output), sep='\t')
''' to monitor memory '''
def current_mem_usage():
return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024.
if __name__ == '__main__':
main()
赏金猎人更新:
我已经使用 Pool.map()
实现了多处理,但是代码造成了很大的内存负担(输入测试文件 ~ 300 MB,但内存负担约为 6 GB)。我只期望最大 3*300 MB 的内存负担。
- 有人可以解释一下,是什么导致如此小的文件和如此小的长度计算需要如此巨大的内存。
- 此外,我正在尝试获取答案并使用它来改进我的大型程序中的多进程。因此,添加任何不会过多改变计算部分(CPU 绑定进程)结构的方法、模块应该没问题。
- 为了测试目的,我已经包含了两个测试文件来使用代码。
- 附加代码是完整代码,因此它应该像复制粘贴时一样按预期工作。任何更改都应该仅用于改进多处理步骤中的优化。
原文由 everestial007 发布,翻译遵循 CC BY-SA 4.0 许可协议
先决条件
getsizeof
我们可以准确地看到对象的大小(以字节为单位):multiprocessing.get_start_method()
)创建子进程时,不会复制父进程的物理内存,而是使用 写时复制 技术。数据框
不要让我们单独看看你的
DataFrame
。memory_profiler
会帮助我们。justpd.py
现在让我们使用探查器:
我们可以看到剧情:
和逐行跟踪:
我们可以看到数据帧在构建时占用了 ~2 GiB,峰值为 ~3 GiB。更有趣的是
info
的输出。但是
info(memory_usage='deep')
(“深度”意味着通过询问object
dtype
深入地反省数据)给出:见下文啊?!从流程外部看,我们可以确保
memory_profiler
的数字是正确的。sys.getsizeof
也为帧显示相同的值(很可能是因为自定义__sizeof__
),其他使用它来估计分配的工具也是如此gc.get_objects()
-f9a5pympler
。给出:
那么这些 7.93 GiB 从何而来?让我们试着解释一下。我们有 4M 行和 34 列,这给了我们 134M 个值。它们是
int64
或object
(这是一个 64 位指针;有关详细说明,请参阅 using pandas with large data )。因此我们有134 * 10 ** 6 * 8 / 2 ** 20
~1022 MiB 仅用于数据框中的值。剩下的 ~ 6.93 GiB 呢?字符串实习
要理解这种行为,有必要知道 Python 会进行字符串驻留。有两篇关于 Python 2 中的字符串实习的好文章( 一篇, 两篇)。除了 Python 3 中的 Unicode 更改和 Python 3.3 中的 PEP 393 之外,C 结构也发生了变化,但思想是相同的。基本上,每个看起来像标识符的短字符串都将被 Python 缓存在内部字典中,并且引用将指向相同的 Python 对象。换句话说,我们可以说它的行为像一个单例。我上面提到的文章解释了它提供的显着内存配置文件和性能改进。我们可以使用
interned
字段PyASCIIObject
来检查字符串是否被驻留:然后:
对于两个字符串,我们还可以进行身份比较(在 CPython 的情况下在内存比较中解决)。
因此,关于
object
dtype
,数据框最多分配 20 个字符串(每个氨基酸一个)。不过,值得注意的是 Pandas 推荐枚举的 分类类型。熊猫记忆
因此,我们可以像这样解释 7.93 GiB 的天真估计:
请注意,
str_size
是 58 个字节,而不是我们在上面看到的 1 个字符文字的 50 个字节。这是因为 PEP 393 定义了紧凑型和非紧凑型字符串。您可以使用sys.getsizeof(gen_matrix_df.REF[0])
检查它。正如
gen_matrix_df.info()
所报告的那样,实际内存消耗应该是 ~1 GiB,它是原来的两倍。我们可以假设它与 Pandas 或 NumPy 完成的内存(预)分配有关。下面的实验说明不无道理(多次运行显示保存图片):我想引用 Pandas 原作者 关于设计问题和未来 Pandas2 的最新文章中 的一句话来结束本节。
进程树
最后让我们来看看池,看看是否可以使用写时复制。我们将使用
smemstat
(可从 Ubuntu 存储库获得)估计进程组内存共享和glances
记下系统范围的可用内存。两者都可以写JSON。我们将使用
Pool(2)
运行原始脚本。我们需要 3 个终端窗口。smemstat -l -m -p "python3.6 script.py" -o smemstat.json 1
glances -t 1 --export-json glances.json
mprof run -M script.py
然后
mprof plot
产生:总和图 (
mprof run --nopython --include-children ./script.py
) 如下所示:请注意,上面的两个图表显示的是 RSS。假设是因为写时复制,它并不反映实际的内存使用情况。现在我们有两个来自
smemstat
和glances
的 JSON 文件。我将使用以下脚本将 JSON 文件转换为 CSV。首先让我们看看
free
内存。第一个和最小值之间的差异约为 4.15 GiB。这是 PSS 数据的样子:
总和:
因此我们可以看到,由于写时复制,实际内存消耗约为 4.15 GiB。但是我们仍在序列化数据以通过
Pool.map
将其发送到工作进程。我们也可以在这里利用写时复制吗?共享数据
要使用写时复制,我们需要让
list(gen_matrix_df_list.values())
可以全局访问,以便 fork 之后的工作人员仍然可以读取它。del gen_matrix_df
中修改main
之后的代码,如下所示:del gen_matrix_df_list
稍后。matrix_to_vcf
的第一行,如:现在让我们重新运行它。空闲内存:
进程树:
它的总和:
因此,我们的实际内存使用量最多为 ~2.9 GiB(峰值主进程在构建数据帧时有),写时复制有帮助!
作为旁注,有所谓的读时复制,即 Python 的引用循环垃圾收集器的行为, 在 Instagram Engineering 中有所描述(导致
gc.freeze
在 issue31558 中)。但是gc.disable()
在这种特殊情况下没有影响。更新
写时复制无副本数据共享的替代方法是从一开始就使用
numpy.memmap
将其委托给内核。这是 Python 谈话中高性能数据处理 的示例实现。 棘手的部分 是让 Pandas 使用 mmaped Numpy 数组。