关于python的asyncio的timeout?

需求:
一个asyncio.Queue()作为中间交换。一个线程往里put, 一个线程从里get. 使用future总是取不得值.

示例demo:

import asyncio
import threading, time

audioQueue = asyncio.Queue()

def job(task,loop):
    def _asyncJob(task, loop):
        result = 'hello'
        time.sleep(1)
        asyncio.run_coroutine_threadsafe(task(result), loop)
        loop.call_soon_threadsafe(lambda: print('size:%s' % audioQueue.qsize()))

    threading.Thread(name=f"transcribe_thread_1", target=_asyncJob, args=(task, loop)).start()

def sync_function(loop):
    # 这是一个同步函数
    task = lambda result : audioQueue.put(result)
    job(task, loop)

def getEle(loop):
    
    try:
        future = asyncio.run_coroutine_threadsafe(audioQueue.get(), loop)
        chnStatement = future.result(0.1)
    except asyncio.TimeoutError as exout:
        chnStatement = 'timeout'
    except asyncio.CancelledError as excel:
        chnStatement = 'cancel'
    except Exception as exc:
        chnStatement = 'exception'
    
    print(f'get element {chnStatement}')

async def main():
    # get the current event loop
    loop = asyncio.get_running_loop()
    sync_function(loop)
    threading.Thread(name=f"transcribe_thread_2", target=getEle, args=(loop,)).start()

if __name__ == '__main__':
    asyncio.run(main())

get为什么需要timeout, 因为一开始put的过程要滞后一些,而get要求实时。要是使用await,线程会一直卡住

环境:
python: 3.10.x

阅读 997
avatarAI BotBETA

首先,要澄清的是,asyncio 是为单线程并发设计的,它并不直接支持多线程间的同步。在你的例子中,你试图在一个线程中使用 asyncioQueue,并在另一个线程中尝试从该队列中同步地获取数据,这是有问题的。

然而,关于你的问题,future.result(timeout) 需要一个超时的原因是因为 future.result() 是一个阻塞调用,它会等待 Future 完成(即,等待与之关联的协程完成)。如果没有设置超时,并且 Future 永远不会完成(例如,由于某种错误或死锁),那么调用 future.result() 的线程将会永远阻塞。

在你的例子中,由于 audioQueue.get() 是一个协程,你需要在一个协程中等待它,而不是在一个线程中同步地等待它的结果。但是,由于你需要在另一个线程中调用它,你使用了 asyncio.run_coroutine_threadsafe(),它返回一个 Future,你可以等待这个 Future 的结果。

但是,由于 audioQueue.get() 可能会阻塞(如果没有可用的项目),你需要设置一个超时来防止调用 future.result() 的线程无限期地等待。

然而,你的代码有几个问题:

  1. time.sleep(1) 会阻塞整个线程,包括事件循环,这可能导致问题。你应该使用 await asyncio.sleep(1) 代替,但这只能在协程中使用。
  2. 你试图在一个线程中运行协程(使用 asyncio.run_coroutine_threadsafe()),但在另一个线程中同步地等待它的结果。这可能导致竞争条件和难以调试的问题。
  3. audioQueue.qsize() 不是一个线程安全的操作,并且在多线程环境中使用它可能是危险的。

一个更好的方法可能是使用 asyncio 的同步原语(如锁和条件变量)来在协程之间同步,而不是在线程之间同步。但是,如果你确实需要在多线程环境中使用 asyncio 的队列,你应该考虑重新设计你的代码以避免需要超时。

不过,对于你当前的问题,解决方案是使用超时来防止无限期地等待 Future 的结果。但是,请注意,这只是一个临时的解决方案,并且可能需要更深入地考虑你的多线程和 asyncio 集成策略。

1 个回答
✓ 已被采纳

asyncio是单线程并发。

关于 asyncio.Queue() 给你一个代码例子:

import asyncio

async def producer(queue):
    for i in range(5):
        await queue.put(i)
        print(f"Produced: {i}")
        await asyncio.sleep(1)

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"Consumed: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()
    producer_task = asyncio.create_task(producer(queue))
    consumer_task = asyncio.create_task(consumer(queue))
    await asyncio.gather(producer_task, consumer_task)
    await queue.join()

asyncio.run(main())

producer 函数负责向队列中放入数据。
consumer 函数从队列中取出数据。
在 main 函数中,创建了一个 asyncio.Queue 对象,并启动了生产者和消费者的任务。
通过 queue.join() 确保所有的任务都完成处理。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏