multiprocessing中gzip写文件的问题

问题描述

multiprocessing中,开启多进程写多个文件,但是运行结束后,硬盘上有*.gz,但是内容为空.

问题出现的环境背景及自己尝试过哪些方法

输入小体积的gzip文件时,输出的gzip为空
输出大体积的gzip文件时,输出的gzip正常

相关代码

想把结果随机写到10个gzip文件里,所以先生成了一个列表,保存文件句柄.

gzip_files = []
for i in range(10):
    gzip_files.append(gzip.open(str(i) + '.gz','wt'))

从file里挑选合适的read,随机写入到某个gzip_file里

def choose_read_gz(file, gzip_files, cutoff, read_format):
    with gzip.open(file, 'rt') as handle:
        for read in SeqIO.parse(handle, read_format):
            if len(read.seq) > cutoff:
                gzip_files[randint(0, 9)].write(read.format(read_format))

文件比较多,开启多进程

with Pool(16) as pool:
    for file in files:
        pool.apply_async(choose_read_gz, args=(file, gzip_files, cutoff, read_format, ))
    pool.close()
    pool.join()

最后关闭文件

for gzip_file in gzip_files:
    gzip_file.close()

你期待的结果是什么?实际看到的错误信息又是什么?

  1. 输入小体积的gzip文件时,输出的gzip内容为空,大小为27B.
  2. 输出大体积的gzip文件时,输出的gzip内容正常.
  3. 每次写加flush,输出的gzip正常,内容正常.
def (file, gzip_files, cutoff):
    with gzip.open(file, 'rt') as handle:
        for read in SeqIO.parse(handle, read_format):
            if len(read.seq) > cutoff:
                filehandle = gzip_files[randint(0, 9)]
                filehandle.write(read.format(read_format))
                filehandle.flush()
  1. 最后同一控制所有句柄的flush,输出的gzip内容为空
for gzip_file in gzip_files:
    gzip_file.flush()
    gzip_file.close()

为什么需要每次强制flush才会写入硬盘,不能在最后一起控制文件句柄关闭,写入硬盘?

阅读 3k
1 个回答

应该避免在进程之间传递文件对象, 不如采用 生产/消费 模式, 用 Queue 联接.

以下示例, 演示多个进程生成数据, 存放到 Queue , 最终由额外进程读取并写入到 gzip 文件.

# -*- coding: utf-8 -*-
def produce(q):
    for i in range(10):
        q.put(f'=={i}==\n')


def consume(q):
    import gzip
    with gzip.open('a.gz', 'wt') as f:
        while True:
            data = q.get()
            if not data:
                break
            f.write(data)


def main():
    from multiprocessing import Pool, Manager
    q = Manager().Queue()

    consumePool = Pool(1)
    consumePool.apply_async(consume, args=(q,))

    with Pool(2) as pool:
        pool.apply_async(produce, args=(q,))
        pool.close()
        pool.join()

    q.put('')  # close
    consumePool.close()
    consumePool.join()


if __name__ == "__main__":
    main()
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题