熊猫 df.iterrows() 并行化

新手上路,请多包涵

我想并行化以下代码:

 for row in df.iterrows():
    idx = row[0]
    k = row[1]['Chromosome']
    start,end = row[1]['Bin'].split('-')

    sequence = sequence_from_coordinates(k,1,start,end) #slow download form http

    df.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
    df.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
    df.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))

我尝试使用 multiprocessing.Pool() 因为每一行都可以独立处理,但我不知道如何共享 DataFrame。我也不确定这是与熊猫进行并行化的最佳方法。有什么帮助吗?

原文由 alec_djinn 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 937
2 个回答

正如@Khris 在他的评论中所说,您应该将数据框分成几个大块并并行迭代每个块。您可以任意将数据帧分成大小随机的块,但根据您计划使用的进程数将数据帧分成大小相等的块更有意义。幸运的是,其他人 已经想出如何为我们完成这部分 工作:

 # don't forget to import
import pandas as pd
import multiprocessing

# create as many processes as there are CPUs on your machine
num_processes = multiprocessing.cpu_count()

# calculate the chunk size as an integer
chunk_size = int(df.shape[0]/num_processes)

# this solution was reworked from the above link.
# will work even if the length of the dataframe is not evenly divisible by num_processes
chunks = [df.iloc[df.index[i:i + chunk_size]] for i in range(0, df.shape[0], chunk_size)]

这将创建一个列表,其中包含我们的数据框。现在我们需要将它连同一个操作数据的函数一起传递到我们的池中。

 def func(d):
   # let's create a function that squares every value in the dataframe
   return d * d

# create our pool with `num_processes` processes
pool = multiprocessing.Pool(processes=num_processes)

# apply our function to each chunk in the list
result = pool.map(func, chunks)

在这一点上, result 将是一个列表,其中包含每个块被操作后的列表。在这种情况下,所有值均已平方。现在的问题是原始数据框没有被修改,所以我们必须用我们池中的结果替换它所有现有的值。

 for i in range(len(result)):
   # since result[i] is just a dataframe
   # we can reassign the original dataframe based on the index of each chunk
   df.iloc[result[i].index] = result[i]

现在,我操作数据框的函数是矢量化的,如果我只是将它应用于整个数据框而不是分成块,可能会更快。但是,在您的情况下,您的函数将遍历每个块的每一行,然后返回该块。这允许您一次处理 num_process 行。

 def func(d):
   for row in d.iterrow():
      idx = row[0]
      k = row[1]['Chromosome']
      start,end = row[1]['Bin'].split('-')

      sequence = sequence_from_coordinates(k,1,start,end) #slow download form http
      d.set_value(idx,'GC%',gc_content(sequence,percent=False,verbose=False))
      d.set_value(idx,'G4 repeats', sum([len(list(i)) for i in g4_scanner(sequence)]))
      d.set_value(idx,'max flexibility',max([item[1] for item in dna_flex(sequence,verbose=False)]))
   # return the chunk!
   return d

然后你重新分配原始数据框中的值,你已经成功地并行化了这个过程。

我应该使用多少进程?

你的最佳表现将取决于这个问题的答案。虽然“所有过程!!!!”是一个答案,更好的答案要微妙得多。在某一点之后,在一个问题上投入更多进程实际上会产生比其价值更多的开销。这被称为 阿姆达尔定律。同样,我们很幸运其他人已经为我们解决了这个问题:

  1. Python multiprocessing 的 Pool 进程限制
  2. 我应该并行运行多少个进程?

一个好的默认值是使用 multiprocessing.cpu_count() ,这是 --- multiprocessing.Pool --- 的默认行为。 根据文档“如果进程为 None,则使用 cpu_count() 返回的数字。”这就是为什么我在开始时将 num_processes 设置为 multiprocessing.cpu_count() 。这样,如果您移动到更强大的机器,您可以从中受益,而无需直接更改 num_processes 变量。

原文由 TheF1rstPancake 发布,翻译遵循 CC BY-SA 4.0 许可协议

更快的方法(在我的例子中大约是 10%):

与已接受答案的主要区别:使用 pd.concatnp.array_split 拆分和加入数据帧。

 import multiprocessing
import numpy as np

def parallelize_dataframe(df, func):
    num_cores = multiprocessing.cpu_count()-1  #leave one free to not freeze machine
    num_partitions = num_cores #number of partitions to split dataframe
    df_split = np.array_split(df, num_partitions)
    pool = multiprocessing.Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

其中 func 是您要应用于 df 的功能。使用 partial(func, arg=arg_val) 不止一个参数。

原文由 ic_fl2 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏