需求:
声音动态生成, 视频固定来源. 代码中使用的是testsrc. 代码一直卡在rawvedio写入命名管道哪也没撒错误
示例代码:
import subprocess
import os
from threading import Thread
import numpy as np
from transformers import VitsModel, VitsTokenizer, PreTrainedTokenizerBase
import torch
import ffmpeg
def read_frame_from_stdout(vedioProcess, width, height):
frame_size = width * height * 3
input_bytes = vedioProcess.stdout.read(frame_size)
if not input_bytes:
return
assert len(input_bytes) == frame_size
return np.frombuffer(input_bytes, np.uint8).reshape([height, width, 3])
def writer(vedioProcess, pipe_name, chunk_size):
width = 640
height = 480
while True:
input_frame = read_frame_from_stdout(vedioProcess, width, height)
print('read frame is:' % input_frame)
if input_frame is None:
print('read frame is: None')
break
frame = input_frame * 0.3
os.write(fd_pipe, (frame.astype(np.uint8).tobytes()))
# Closing the pipes as closing files.
os.close(fd_pipe)
# 加载TTS模型
def loadModel(device: str):
model = VitsModel.from_pretrained("./mms-tts-eng", local_files_only=True).to(device) # acebook/mms-tts-deu
tokenizer = VitsTokenizer.from_pretrained("./mms-tts-eng", local_files_only=True)
return model, tokenizer
# 将32位浮点转成16位整数, 适用于:16000(音频采样率)
def covertFl32ToInt16(nyArr):
return np.int16(nyArr / np.max(np.abs(nyArr)) * 32767)
def audioWriteInPipe(nyArr, audioPipeName):
# Write to named pipe as writing to a file (but write the data in small chunks).
os.write(audioPipeName, covertFl32ToInt16(nyArr.squeeze()).tobytes()) # Write 1024 bytes of data to fd_pipe
# 生成numpy
def generte(prompt:str, device: str, model: VitsModel, tokenizer: PreTrainedTokenizerBase):
inputs = tokenizer(prompt, return_tensors="pt").to(device)
#
with torch.no_grad():
#
output = model(**inputs).waveform
return output.cpu().numpy()
def soundPipeWriter(model, device, tokenizer, pipeName):
fd_pipe = os.open(pipeName, os.O_WRONLY)
filepath = 'night.txt'
for content in read_file(filepath):
print(content)
audioWriteInPipe(generte(prompt=content, device=device, model=model, tokenizer=tokenizer), audioPipeName=fd_pipe)
os.close(fd_pipe)
# 读取文件源
def read_file(filepath:str):
with open(filepath) as fp:
for content in fp:
yield content
def record(vedioProcess, model, tokenizer, device):
# Names of the "Named pipes"
pipeA = "audio_pipe1"
pipeV = "vedio_pipe2"
# Create "named pipes".
os.mkfifo(pipeA)
os.mkfifo(pipeV)
# Open FFmpeg as sub-process
# Use two audio input streams:
# 1. Named pipe: "audio_pipe1"
# 2. Named pipe: "audio_pipe2"
process = (
ffmpeg
.concat(ffmpeg.input("pipe:vedio_pipe2"), ffmpeg.input("pipe:audio_pipe1"), v=1, a=1)
.output("merge_audio_vedio.mp4", pix_fmt='yuv480p', vcodec='copy', acodec='aac')
.run_async(pipe_stderr=True)
)
# Initialize two "writer" threads (each writer writes data to named pipe in chunks of 1024 bytes).
thread1 = Thread(target=soundPipeWriter, args=(model, device, tokenizer, pipeA)) # thread1 writes samp1 to pipe1
thread2 = Thread(target=writer, args=(vedioProcess, pipeV, 1024)) # thread2 writes samp2 to pipe2
# Start the two threads
thread1.start()
thread2.start()
# Wait for the two writer threads to finish
thread1.join()
thread2.join()
process.wait() # Wait for FFmpeg sub-process to finish
# Remove the "named pipes".
os.unlink(pipeV)
os.unlink(pipeA)
if __name__ == "__main__":
device: str = "cuda:0" if torch.cuda.is_available() else "cpu"
model, tokenizer = loadModel(device=device)
# make lavfi-testSrc 60s mp4
vedioProcess = (
ffmpeg
.input('testsrc=duration=10:size=640x480:rate=30', f="lavfi", t=60)
.output('pipe:', format='rawvideo', pix_fmt='rgb24')
.run_async(pipe_stdout=True)
)
#
record(vedioProcess, model, tokenizer, device)
vedioProcess.wait()
vscode中截图:
视频卡住是因为编码不对.用h264都可以过了.声音输入不可用是因为没有编码。 完整的示例如下: