使用 Python asyncio 从同步函数运行并等待异步函数

新手上路,请多包涵

在我的代码中,我有一个带有属性的类,它偶尔需要运行异步代码。有时我需要从异步函数访问属性,有时从同步函数访问属性——这就是为什么我不希望我的属性是异步的。此外,我的印象是异步属性通常是一种代码味道。如我错了请纠正我。

我在从同步属性执行异步方法并阻止进一步执行直到异步方法完成时遇到问题。

这是一个示例代码:

 import asyncio

async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')

def synchronous_property():
    print('entering synchronous_property')
    loop = asyncio.get_event_loop()
    try:
        # this will raise an exception, so I catch it and ignore
        loop.run_until_complete(asynchronous())
    except RuntimeError:
        pass
    print('exiting synchronous_property')

async def asynchronous():
    print('entering asynchronous')
    print('exiting asynchronous')

asyncio.run(main())

它的输出:

 entering main
entering synchronous_property
exiting synchronous_property
exiting main
entering asynchronous
exiting asynchronous

首先, RuntimeError 捕获似乎是错误的,但如果我不这样做,我会得到 RuntimeError: This event loop is already running 异常。

其次, asynchronous() 函数最后执行,同步完成后。我想通过异步方法对数据集做一些处理,所以我需要等待它完成。 If I’ll add await asyncio.sleep(0) after calling synchronous_property() , it will call asynchronous() before main() finish, but it doesn’t help me.我需要运行 asynchronous() 之前 synchronous_property() 完成。

我错过了什么?我正在运行 python 3.7。

原文由 Andrzej Klajnert 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 876
2 个回答

Asyncio 在 设计 上确实坚持不允许嵌套循环。但是,您始终可以在不同的线程中运行另一个事件循环。这是一个使用线程池的变体,以避免每次都必须创建一个新线程:

 import asyncio, concurrent.futures

async def main():
    print('entering main')
    synchronous_property()
    print('exiting main')

pool = concurrent.futures.ThreadPoolExecutor()

def synchronous_property():
    print('entering synchronous_property')
    result = pool.submit(asyncio.run, asynchronous()).result()
    print('exiting synchronous_property', result)

async def asynchronous():
    print('entering asynchronous')
    await asyncio.sleep(1)
    print('exiting asynchronous')
    return 42

asyncio.run(main())

此代码在每个 sync->async 边界上创建一个新的事件循环,所以如果你经常这样做,不要期望高性能。可以通过使用 asyncio.new_event_loop 每个线程只创建一个事件循环并将其缓存在线程局部变量中来改进它。

原文由 user4815162342 发布,翻译遵循 CC BY-SA 4.0 许可协议

最简单的方法是使用现有的“轮子”,比如 asgiref.async_to_sync

 from asgiref.sync import async_to_sync

然后:

 async_to_sync(main)()

一般来说:

 async_to_sync(<your_async_func>)(<.. arguments for async function ..>)

这是一个调用者类,它将仅在具有事件循环的线程上工作的可等待对象转变为在子线程中工作的同步可调用对象。

如果调用堆栈包含异步循环,代码将在那里运行。否则,代码将在新线程的新循环中运行。

无论哪种方式,此线程都会暂停并等待运行使用 SyncToAsync 从调用堆栈的更下方调用的任何 thread_sensitive 代码,然后在异步任务返回后最终退出。

原文由 Sławomir Lenart 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进