我正在尝试处理一个文件(每一行都是一个 json 文档)。文件的大小可以达到 100 mb 到 gb。所以我写了一个生成器代码来从文件中逐行获取每个文档。
def jl_file_iterator(file):
with codecs.open(file, 'r', 'utf-8') as f:
for line in f:
document = json.loads(line)
yield document
我的系统有 4 个内核,所以我想并行处理 4 行文件。目前我有这段代码一次需要 4 行并调用代码进行并行处理
threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
files.append(jl)
if i % (threads) == 0:
# pool.map(processFile, files)
parallelProcess(files, o)
files = []
i += 1
if files:
parallelProcess(files, o)
files = []
这是我进行实际处理的代码
def parallelProcess(files, outfile):
processes = []
for i in range(len(files)):
p = Process(target=processFile, args=(files[i],))
processes.append(p)
p.start()
for i in range(len(files)):
processes[i].join()
def processFile(doc):
extractors = {}
... do some processing on doc
o.write(json.dumps(doc) + '\n')
如您所见,在发送接下来的 4 个文件进行处理之前,我等待所有 4 行完成处理。但是我想做的是,一旦一个进程完成处理文件,我想开始下一行以分配给已释放的处理器。我怎么做?
PS:问题是因为它是一个生成器,我无法加载所有文件并使用 map 之类的东西来运行进程。
谢谢你的帮助
原文由 Muthu Rg 发布,翻译遵循 CC BY-SA 4.0 许可协议
正如@pvg 在评论中所说,(有界)队列是在不同速度的生产者和消费者之间进行调解的自然方式,确保他们都尽可能忙碌,但又不让生产者超前。
这是一个独立的可执行示例。队列的最大大小被限制为等于工作进程的数量。如果消费者运行得比生产者快得多,那么让队列变得比这更大是很有意义的。
在您的特定情况下,将线路传递给消费者并让他们并行执行
document = json.loads(line)
部分可能是有意义的。