问题描述
multiprocessing中,开启多进程写多个文件,但是运行结束后,硬盘上有*.gz,但是内容为空.
问题出现的环境背景及自己尝试过哪些方法
输入小体积的gzip文件时,输出的gzip为空
输出大体积的gzip文件时,输出的gzip正常
相关代码
想把结果随机写到10个gzip文件里,所以先生成了一个列表,保存文件句柄.
gzip_files = []
for i in range(10):
gzip_files.append(gzip.open(str(i) + '.gz','wt'))
从file里挑选合适的read,随机写入到某个gzip_file里
def choose_read_gz(file, gzip_files, cutoff, read_format):
with gzip.open(file, 'rt') as handle:
for read in SeqIO.parse(handle, read_format):
if len(read.seq) > cutoff:
gzip_files[randint(0, 9)].write(read.format(read_format))
文件比较多,开启多进程
with Pool(16) as pool:
for file in files:
pool.apply_async(choose_read_gz, args=(file, gzip_files, cutoff, read_format, ))
pool.close()
pool.join()
最后关闭文件
for gzip_file in gzip_files:
gzip_file.close()
你期待的结果是什么?实际看到的错误信息又是什么?
- 输入小体积的gzip文件时,输出的gzip内容为空,大小为27B.
- 输出大体积的gzip文件时,输出的gzip内容正常.
- 每次写加flush,输出的gzip正常,内容正常.
def (file, gzip_files, cutoff):
with gzip.open(file, 'rt') as handle:
for read in SeqIO.parse(handle, read_format):
if len(read.seq) > cutoff:
filehandle = gzip_files[randint(0, 9)]
filehandle.write(read.format(read_format))
filehandle.flush()
- 最后同一控制所有句柄的flush,输出的gzip内容为空
for gzip_file in gzip_files:
gzip_file.flush()
gzip_file.close()
为什么需要每次强制flush才会写入硬盘,不能在最后一起控制文件句柄关闭,写入硬盘?
应该避免在进程之间传递文件对象, 不如采用 生产/消费 模式, 用 Queue 联接.
以下示例, 演示多个进程生成数据, 存放到 Queue , 最终由额外进程读取并写入到 gzip 文件.