请问如何异步顺序写文件?

请问,我这儿有一个m3u8在线链接,想通过python异步的方式去下载成一个ts文件,但异步编程的水平不够,想过用队列或者异步锁去做,但看文档后写的还是不对。下面是完整代码,希望大佬可以基于此帮忙改一版可以顺序写文件的出来,在此感谢。

import re
import requests
import time
import urllib3
import asyncio
import aiofiles
import queue
from typing import List, Dict

import httpx

urllib3.disable_warnings()

# m3u8_path = "https://13.cdn-vod.huaweicloud.com/asset/94a33fb7bb3c6c6b514d7463f4be98ce/play_video/442d776c312aa0b57193ae1deae9f943/cb8db232c9e31a09c25df076c5f71a4d_2.m3u8"
m3u8_path = 'https://13.cdn-vod.huaweicloud.com/asset/e0a13b1737c48e6335b760a3dffaf4cf/play_video/7d5c1d3316918cd9484941c2b468a44e.m3u8'
header = {
    "User-Agent":
    "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.20 Safari/537.36"
}


def applyUrl(target_url: str, base_url: str) -> str:
    if 'http' in target_url:
        return target_url

    if target_url[0] == '/':
        base_url_list = base_url.split('/')
        return base_url_list[0] + '//' + base_url_list[2] + target_url

    base_url_list = base_url.split('/')
    base_url_list.pop()
    return '/'.join(base_url_list) + '/' + target_url


async def get_ts_lists(m3u8_path: str) -> List[str]:
    ts_urls: List[str] = []
    print("正在解析:" + m3u8_path.split("/")[-1])
    # 获取m3u8文件内容
    async with httpx.AsyncClient(headers=header, verify=False) as client:
        res = await client.get(url=m3u8_path)
        r = await res.aread()
        # 通过正值表达式获取key和ts的链接
        # k = re.compile(r"http://.*?\.key")  # key的正则匹配
        k = re.compile(r'(.*URI="([^"]+))"')
        t = re.compile(r".*?.ts")  # ts的正则匹配
        iv = re.compile(r'(.*IV=([^,\s]+))')
        key_url_list = k.findall(r.decode(encoding='utf-8'))  # key的url
        key_url = key_url_list[0][1] if len(key_url_list) != 0 else ''
        iv_s_list = iv.findall(r.decode(encoding='utf-8'))
        iv_s = iv_s_list[0][1] if len(iv_s_list) != 0 else ''
        ts_urls = t.findall(r.decode(encoding='utf-8'))  # ts的url列表
    # print('ts urls:', ts_urls)

    if len(ts_urls) != 0:
        for idx, ts_url in enumerate(ts_urls):
            ts_url = applyUrl(ts_url, m3u8_path)
            ts_urls[idx] = ts_url

    return ts_urls


async def download_m3u8_file(ts_url: str):
    global q
    res_dict: Dict[str, httpx.Response] = {}
    ts_name = ts_url.split("/")[-1]  # ts文件名
    # 获取ts文件二进制数据
    print("正在下载:" + ts_name)

    async with httpx.AsyncClient(headers=header, verify=False) as client:
        res = await client.get(ts_url)
        res_dict.setdefault(ts_name, res)
    
    q.put(res_dict)


async def write_to_file(name: str):
    global q
    while True:
        async with mutex:
            if q.qsize() == 0:
                break
            res = q.get()
            item = res.popitem()
            ts_name, te_res = item[0], item[1]
            async with aiofiles.open(name, "ab") as file:
                await file.write(te_res.read())
                print("保存成功:" + ts_name)


async def main():
    ts_urls = await get_ts_lists(m3u8_path)
    await asyncio.gather(*[download_m3u8_file(ts_url) for ts_url in ts_urls])


if __name__ == '__main__':
    name = "dream_it_possible.ts"
    q = queue.Queue()
    mutex = asyncio.Lock()
    asyncio.run(main())
    asyncio.run(write_to_file(name))
    print(name, "下载完成")
阅读 1.9k
1 个回答

这个可能不仅仅是异步的问题,是写入策略问题,因为这种分切ts组合成整体必须严格按照顺序,且没有其它数据定位手段,所以好的整合策略是分个下载,整体或者说按序写入。
即可以分批下载,最后等全部下载就绪后再一次写入,或者按序下载,前面连续的可以先行写入(不过这样其实效率也不会高什么,还不如等全部下载完了一次性写入)

此外从IO优化角度来说,这种下载会有一些限制(包括本地的,也包括服务器端限制,过多链接反而不好),所以更多的时候会用链接池类似策略来限制链接数量,所以你在处理中也要注意类似问题。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题