如何更好地结束这里的多进程?

写了一个多进程下载的程序,将大文件分块下载,同时边下载边拼接下载到本地的文件。

import requests,os
from multiprocessing import Process,Queue

n = 20 
url = "https://laotzu.ftp.acc.umu.se/debian-cd/current/amd64/iso-cd/debian-11.7.0-amd64-netinst.iso"
response = requests.get(url, stream=True) 
file_size = int(response.headers['content-length']) 
a,b = divmod(file_size,n)
q = Queue()

def range_down(i,q):
    if i == n-1:
        start = i * a 
        end = file_size - 1
    else:
        start = i * a 
        end = (i+1)*a - 1  
    dst = '/tmp/tmp' + str(i) + '.part'
    header = {"Range": "bytes={}-{}".format(start,end)}
    req = requests.get(url, headers=header) 
    with(open(dst, 'ab')) as f:
        f.write(req.content)
    q.put(i)
        
def merge(q):
    flag = 0
    f = open('/tmp/result','ab+')
    while True:
        i = q.get()
        if i == flag:
            fname = '/tmp/tmp' + str(i) + '.part'
            ftmp = open(fname,'rb')
            f.write(ftmp.read())
            ftmp.close()
            os.remove(fname)
            flag = flag + 1
            if flag == n:break
        else:
            q.put(i)
    f.close()


for i in range(n):
    p=Process(target=range_down, args=(i,q))
    p.start()
p_merge = Process(target=merge, args=(q,))
p_merge.start()
p_merge.join()

有一个地方,我不太满意,就是merge函数的退出(终止)条件

if flag == n:break

没有这句话,整个程序不能正常退出;使用q.empty()来做退出,会有问题,如果最快下载的块都需要几秒,range_down里的q.put(i)还没有执行,merge里面的q.empty就执行了,将导致任务没有完成就提前终止。

if flag == n:break,可以与运行,觉得似乎有更好的方法,这个方法有点??

阅读 2.2k
1 个回答


for i in range(n):
    p=Process(target=range_down, args=(i,q))
    p.start()

# 等待所有的下载进程完成
for p in processes:
    p.join()

# 添加一个特殊的值到队列中,表示所有的下载都已经完成
q.put(None)

p_merge = Process(target=merge, args=(q,))
p_merge.start()
p_merge.join()

然后在 merge 函数里:

def merge(q):
    flag = 0
    f = open('/tmp/result','ab+')
    while True:
        i = q.get()
        if i is None:
            # 如果我们看到特殊的值,我们就知道所有的下载都已经完成
            break
        elif i == flag:
            fname = '/tmp/tmp' + str(i) + '.part'
            ftmp = open(fname,'rb')
            f.write(ftmp.read())
            ftmp.close()
            os.remove(fname)
            flag = flag + 1
        else:
            q.put(i)
    f.close()
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进