python 异步协程的实现

需要对一个上百万行的json文件进行清洗,并需要将清洗好的结构化数据重新存储为csv文件。尝试使用pandas的dataframe转存清洗好的数据条目,却发现常规的清洗一条写入一条的速度太慢了,速度主要卡在每次数据的写入,于是为此专门定义了一个 asyn writeline,同时利用 async readline 每次生成 100 个协程处理100行数据,然而测试结果却和常规的按序处理无差别,平均每条数据处理速度都在 0.5 s 左右,感觉是自己的 asyn writeline 有问题,还请大神赐教。

测试代码如下:

import pandas as pd
import json
import time
import asyncio

def trop():
    tropicos = pd.DataFrame()
    with open(r"/tropicosbase.json", "r") as yn:
        count = 0
        tropicos['tag'] = None
        tropicos.loc[0] = None
        async def readline(line):
            nonlocal count
            js = json.loads(line)
            await writeline(js, tropicos, count)
            count += 1
            tropicos.loc[count] = None
        cs = yn.readlines()[:100]
        tasks = [asyncio.ensure_future(readline(line)) for line in cs]
        loop = asyncio.get_event_loop()
        start = time.time()
        loop.run_until_complete(asyncio.wait(tasks))
        end = time.time()
        print(end - start)

    tropicos.to_csv(r'/tropicos.csv', index=None)


async def writeline(js, tropicos, count):
    for k, v in js.items():
        try:
            tropicos[k][count] = v
        except KeyError:
            if k == 'detailsdiv':
                pass
            else:
                tropicos[k] = pd.Series()
                tropicos[k][count] = v

trop()
阅读 4.8k
3 个回答

用 asyncio 处理这种问题没有任何优势,它应该用在 I/O 密集型运算。

你应该用最简单的实现方法,然后使用 cProfile 找出性能的瓶颈。

参考 https://docs.python.org/3/lib...

这种情况应该用多进程,而不是多线程或协程……

个人经验.
尽量把结果保存在内存中,减少写入文件的次数.
1次写入10000行和写入10000次1行的时间,差别很大.

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进