我想并行化以下代码:
for row in df.iterrows():
idx = row[0]
k = row[1]['Chromosome']
start,end = row[1]['Bin'].split('-')
sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
我尝试使用 multiprocessing.Pool()
因为每一行都可以独立处理,但我不知道如何共享 DataFrame。我也不确定这是与熊猫进行并行化的最佳方法。有什么帮助吗?
原文由 alec_djinn 发布,翻译遵循 CC BY-SA 4.0 许可协议
正如@Khris 在他的评论中所说,您应该将数据框分成几个大块并并行迭代每个块。您可以任意将数据帧分成大小随机的块,但根据您计划使用的进程数将数据帧分成大小相等的块更有意义。幸运的是,其他人 已经想出如何为我们完成这部分 工作:
这将创建一个列表,其中包含我们的数据框。现在我们需要将它连同一个操作数据的函数一起传递到我们的池中。
在这一点上,
result
将是一个列表,其中包含每个块被操作后的列表。在这种情况下,所有值均已平方。现在的问题是原始数据框没有被修改,所以我们必须用我们池中的结果替换它所有现有的值。现在,我操作数据框的函数是矢量化的,如果我只是将它应用于整个数据框而不是分成块,可能会更快。但是,在您的情况下,您的函数将遍历每个块的每一行,然后返回该块。这允许您一次处理
num_process
行。然后你重新分配原始数据框中的值,你已经成功地并行化了这个过程。
我应该使用多少进程?
你的最佳表现将取决于这个问题的答案。虽然“所有过程!!!!”是一个答案,更好的答案要微妙得多。在某一点之后,在一个问题上投入更多进程实际上会产生比其价值更多的开销。这被称为 阿姆达尔定律。同样,我们很幸运其他人已经为我们解决了这个问题:
一个好的默认值是使用
multiprocessing.cpu_count()
,这是 ---multiprocessing.Pool
--- 的默认行为。 根据文档“如果进程为 None,则使用 cpu_count() 返回的数字。”这就是为什么我在开始时将num_processes
设置为multiprocessing.cpu_count()
。这样,如果您移动到更强大的机器,您可以从中受益,而无需直接更改num_processes
变量。