如何运行协程并在循环运行时等待同步函数产生的结果?

新手上路,请多包涵

我有一个像傻瓜一样的代码:

 def render():
    loop = asyncio.get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    if loop.is_running():
        result = asyncio.ensure_future(test())
    else:
        result = loop.run_until_complete(test())

loop 没有运行时很容易,只需使用 loop.run_until_complete 它返回 coro 结果,但如果循环已经在运行(我的阻塞代码在已经运行的应用程序中运行循环)我不能使用 loop.run_until_complete 因为它会引发异常;当我调用 asyncio.ensure_future 任务被安排并运行时,但我想在那里等待结果,有人知道该怎么做吗?文档不是很清楚如何执行此操作。

我尝试在 coro 中传递一个 concurrent.futures.Future 调用 set_result 然后调用 Future.result() 在我的阻塞代码上,但它没有阻止并且没有让其他任何东西运行。任何帮助,将不胜感激。

原文由 Ordani Sanchez 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 854
2 个回答

要使用建议的设计实现 runner ,您需要一种方法从事件循环中运行的回调中单步执行事件循环。 Asyncio 明确禁止 递归事件循环,因此这种方法是死路一条。

鉴于该约束,您有两个选择:

  1. 使 render() 本身成为协程;
  2. 在与运行异步事件循环的线程不同的线程中执行 render() (及其调用者)。

假设#1 是不可能的,您可以实现 render() 的#2 变体,如下所示:

 def render():
    loop = _event_loop  # can't call get_event_loop()

    async def test():
        await asyncio.sleep(2)
        print("hi")
        return 200

    future = asyncio.run_coroutine_threadsafe(test(), loop)
    result = future.result()

请注意,您不能在 asyncio.get_event_loop() render 因为没有(也不应该)为该线程设置事件循环。相反,生成运行器线程的代码必须调用 asyncio.get_event_loop() 并将其发送到线程,或者将其保留在全局变量或共享结构中。

原文由 user4815162342 发布,翻译遵循 CC BY-SA 3.0 许可协议

同步等待异步协程

如果 asyncio 事件循环已经通过调用 loop.run_forever 运行,它将 阻塞 执行线程,直到 loop.stop 被调用 [参见 文档]。因此,同步等待的唯一方法是在专用线程上运行事件循环,在循环上调度 异步 函数并从 另一个 线程 同步 等待它。

为此,我根据 user4815162342 的 回答 编写了自己的最小解决方案。我还添加了所有工作完成后用于清理循环的部分 [参见 loop.close ]。

下面代码中的 main 函数在专用线程上运行事件循环,在事件循环上调度多个任务,加上要 同步 等待其结果的任务。同步等待将阻塞,直到准备好所需的结果。最后,循环关闭并与其线程一起优雅地清理。

专用线程和函数 stop_looprun_forever_safeawait_sync 可以封装在模块或类中。

有关线程安全的注意事项,请参阅异步文档中的“ 并发和多线程”部分。

 import asyncio
import threading
#----------------------------------------

def stop_loop(loop):
    ''' stops an event loop '''
    loop.stop()
    print (".: LOOP STOPPED:", loop.is_running())

def run_forever_safe(loop):
    ''' run a loop for ever and clean up after being stopped '''

    loop.run_forever()
    # NOTE: loop.run_forever returns after calling loop.stop

    #-- cancell all tasks and close the loop gracefully
    print(".: CLOSING LOOP...")
    # source: <https://xinhuang.github.io/posts/2017-07-31-common-mistakes-using-python3-asyncio.html>

    loop_tasks_all = asyncio.Task.all_tasks(loop=loop)

    for task in loop_tasks_all: task.cancel()
    # NOTE: `cancel` does not guarantee that the Task will be cancelled

    for task in loop_tasks_all:
        if not (task.done() or task.cancelled()):
            try:
                # wait for task cancellations
                loop.run_until_complete(task)
            except asyncio.CancelledError: pass
    #END for
    print(".: ALL TASKS CANCELLED.")

    loop.close()
    print(".: LOOP CLOSED:", loop.is_closed())

def await_sync(task):
    ''' synchronously waits for a task '''
    while not task.done(): pass
    print(".: AWAITED TASK DONE")
    return task.result()
#----------------------------------------

async def asyncTask(loop, k):
    ''' asynchronous task '''
    print("--start async task %s" % k)
    await asyncio.sleep(3, loop=loop)
    print("--end async task %s." % k)
    key = "KEY#%s" % k
    return key

def main():
    loop = asyncio.new_event_loop() # construct a new event loop

    #-- closures for running and stopping the event-loop
    run_loop_forever = lambda: run_forever_safe(loop)
    close_loop_safe = lambda: loop.call_soon_threadsafe(stop_loop, loop)

    #-- make dedicated thread for running the event loop
    thread = threading.Thread(target=run_loop_forever)

    #-- add some tasks along with my particular task
    myTask = asyncio.run_coroutine_threadsafe(asyncTask(loop, 100200300), loop=loop)
    otherTasks = [asyncio.run_coroutine_threadsafe(asyncTask(loop, i), loop=loop)
                  for i in range(1, 10)]

    #-- begin the thread to run the event-loop
    print(".: EVENT-LOOP THREAD START")
    thread.start()

    #-- _synchronously_ wait for the result of my task
    result = await_sync(myTask) # blocks until task is done
    print("* final result of my task:", result)

    #... do lots of work ...
    print("*** ALL WORK DONE ***")
    #========================================

    # close the loop gracefully when everything is finished
    close_loop_safe()
    thread.join()
#----------------------------------------

main()

原文由 AlQuemist 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题