需求:
一个asyncio.Queue()
作为中间交换。一个线程往里put, 一个线程从里get. 使用future总是取不得值.
示例demo:
import asyncio
import threading, time
audioQueue = asyncio.Queue()
def job(task,loop):
def _asyncJob(task, loop):
result = 'hello'
time.sleep(1)
asyncio.run_coroutine_threadsafe(task(result), loop)
loop.call_soon_threadsafe(lambda: print('size:%s' % audioQueue.qsize()))
threading.Thread(name=f"transcribe_thread_1", target=_asyncJob, args=(task, loop)).start()
def sync_function(loop):
# 这是一个同步函数
task = lambda result : audioQueue.put(result)
job(task, loop)
def getEle(loop):
try:
future = asyncio.run_coroutine_threadsafe(audioQueue.get(), loop)
chnStatement = future.result(0.1)
except asyncio.TimeoutError as exout:
chnStatement = 'timeout'
except asyncio.CancelledError as excel:
chnStatement = 'cancel'
except Exception as exc:
chnStatement = 'exception'
print(f'get element {chnStatement}')
async def main():
# get the current event loop
loop = asyncio.get_running_loop()
sync_function(loop)
threading.Thread(name=f"transcribe_thread_2", target=getEle, args=(loop,)).start()
if __name__ == '__main__':
asyncio.run(main())
get为什么需要timeout, 因为一开始put的过程要滞后一些,而get要求实时。要是使用await,线程会一直卡住
环境:
python: 3.10.x
asyncio
是单线程并发。关于
asyncio.Queue()
给你一个代码例子:producer 函数负责向队列中放入数据。
consumer 函数从队列中取出数据。
在 main 函数中,创建了一个 asyncio.Queue 对象,并启动了生产者和消费者的任务。
通过 queue.join() 确保所有的任务都完成处理。