带有生成器的 Python 多处理

新手上路,请多包涵

我正在尝试处理一个文件(每一行都是一个 json 文档)。文件的大小可以达到 100 mb 到 gb。所以我写了一个生成器代码来从文件中逐行获取每个文档。

 def jl_file_iterator(file):
    with codecs.open(file, 'r', 'utf-8') as f:
        for line in f:
            document = json.loads(line)
            yield document

我的系统有 4 个内核,所以我想并行处理 4 行文件。目前我有这段代码一次需要 4 行并调用代码进行并行处理

threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
    files.append(jl)
    if i % (threads) == 0:
        # pool.map(processFile, files)
        parallelProcess(files, o)
        files = []
    i += 1

if files:
    parallelProcess(files, o)
    files = []

这是我进行实际处理的代码

def parallelProcess(files, outfile):
    processes = []
    for i in range(len(files)):
        p = Process(target=processFile, args=(files[i],))
        processes.append(p)
        p.start()
    for i in range(len(files)):
        processes[i].join()

def processFile(doc):
    extractors = {}
    ... do some processing on doc
    o.write(json.dumps(doc) + '\n')

如您所见,在发送接下来的 4 个文件进行处理之前,我等待所有 4 行完成处理。但是我想做的是,一旦一个进程完成处理文件,我想开始下一行以分配给已释放的处理器。我怎么做?

PS:问题是因为它是一个生成器,我无法加载所有文件并使用 map 之类的东西来运行进程。

谢谢你的帮助

原文由 Muthu Rg 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 375
1 个回答

正如@pvg 在评论中所说,(有界)队列是在不同速度的生产者和消费者之间进行调解的自然方式,确保他们都尽可能忙碌,但又不让生产者超前。

这是一个独立的可执行示例。队列的最大大小被限制为等于工作进程的数量。如果消费者运行得比生产者快得多,那么让队列变得比这更大是很有意义的。

在您的特定情况下,将线路传递给消费者并让他们并行执行 document = json.loads(line) 部分可能是有意义的。

 import multiprocessing as mp

NCORE = 4

def process(q, iolock):
    from time import sleep
    while True:
        stuff = q.get()
        if stuff is None:
            break
        with iolock:
            print("processing", stuff)
        sleep(stuff)

if __name__ == '__main__':
    q = mp.Queue(maxsize=NCORE)
    iolock = mp.Lock()
    pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
    for stuff in range(20):
        q.put(stuff)  # blocks until q below its max size
        with iolock:
            print("queued", stuff)
    for _ in range(NCORE):  # tell workers we're done
        q.put(None)
    pool.close()
    pool.join()

原文由 Tim Peters 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题