熊猫python中的并行处理

新手上路,请多包涵

我的数据框中有 5,000,000 行。在我的代码中,我使用的是 iterrows() ,这会花费太多时间。为了获得所需的输出,我必须遍历所有行。所以我想知道我是否可以将 pandas 中的代码并行化。

原文由 surya 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 372
2 个回答

这是我发现可能有帮助的网页:http: //gouthamanbalaraman.com/blog/distributed-processing-pandas.html

这是在该页面中找到的多处理代码:

 import pandas as pd
import multiprocessing as mp

LARGE_FILE = "D:\\my_large_file.txt"
CHUNKSIZE = 100000 # processing 100,000 rows at a time

def process_frame(df):
    # process data frame
    return len(df)

if __name__ == '__main__':
    reader = pd.read_table(LARGE_FILE, chunksize=CHUNKSIZE)
    pool = mp.Pool(4) # use 4 processes

    funclist = []
    for df in reader:
        # process each data frame
        f = pool.apply_async(process_frame,[df])
        funclist.append(f)

    result = 0
    for f in funclist:
        result += f.get(timeout=10) # timeout in 10 seconds

    print "There are %d rows of data"%(result)

原文由 jtitusj 发布,翻译遵循 CC BY-SA 3.0 许可协议

此代码显示了如何将大型数据帧分解为较小的数据帧,每个数据帧的行数等于 N_ROWS (最后一个数据帧可能除外),然后将每个数据帧传递给进程池(无论大小你想要,但使用任何超过你拥有的处理器数量的东西是没有意义的)。每个工作进程将修改后的数据帧返回给主进程,然后主进程通过连接工作进程的返回值来重新组装最终结果数据帧:

 import pandas as pd
import multiprocessing as mp

def process_frame(df):
    # process data frame
    # create new index starting at 0:
    df.reset_index(inplace=True, drop=True)
    # increment everybody's age:
    for i in range(len(df.index)):
        df.at[i, 'Age'] += 1
    return df

def divide_and_conquer(df):
    N_ROWS = 2 # number of rows in each dataframe
    with mp.Pool(3) as pool: # use 3 processes
        # break up dataframe into smaller daraframes of N_ROWS rows each
        cnt = len(df.index)
        n, remainder = divmod(cnt, N_ROWS)
        results = []
        start_index = 0
        for i in range(n):
            results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+N_ROWS-1, :],)))
            start_index += N_ROWS
        if remainder:
            results.append(pool.apply_async(process_frame, args=(df.loc[start_index:start_index+remainder-1, :],)))
        new_dfs = [result.get() for result in results]
        # reassemble final dataframe:
        df = pd.concat(new_dfs, ignore_index=True)
        return df

if __name__ == '__main__':
    df = pd.DataFrame({
        "Name": ['Tom', 'Dick', 'Harry', 'Jane', 'June', 'Sally', 'Mary'],
        "Age": [10, 20, 30, 40, 40, 60, 70],
        "Sex": ['M', 'M', 'M', 'F', 'F', 'F', 'F']
    })
    print(df)
    df = divide_and_conquer(df)
    print(df)

印刷:

     Name  Age Sex
0    Tom   10   M
1   Dick   20   M
2  Harry   30   M
3   Jane   40   F
4   June   40   F
5  Sally   60   F
6   Mary   70   F
    Name  Age Sex
0    Tom   11   M
1   Dick   21   M
2  Harry   31   M
3   Jane   41   F
4   June   41   F
5  Sally   61   F
6   Mary   71   F

原文由 Booboo 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题