Python3 asyncio 在线程中使用

AirT
  • 98

下面代码中模拟了使用 2 个线程管理 event loop 的情况:

import time
import asyncio
import threading

async def task(c, i):
    for _ in range(i):
        print(c)
        await asyncio.sleep(1)
    return i

def thread(loop):  # 异步程序
    asyncio.set_event_loop(loop)
    asyncio.ensure_future(task('sub thread', 999))
    loop.run_forever()

def main():
    threading.Thread(target=thread, args=(asyncio.get_event_loop(), )).start()

    # 同步代码开始
    future = asyncio.ensure_future(task('main thread', 5))
    while not future.done():
        time.sleep(1)
    print('main done: %s' % future.result())


if __name__ == '__main__':
    main()

其中子线程(thread 函数)设定并启用了一个 999 次的 task 任务。

而在主线程中,添加了一个 5 次的 task 任务。我设置了一个 While 循环来检查这项任务是否已经运行完毕,如若完毕则打印出 main done: 5


提出的问题:

  1. 像这样的多线程 event loop 是否有其他方案可以实现?
  2. 在 main 函数的 5 个 task 的任务,最初尝试使用 run_until_complete 来等待执行结束,但是与 run_forever 冲突导致抛出 RuntimeError: This event loop is already running 的错误,那么除了使用 While 循环外,还有其他方法阻塞后面代码执行吗?

实际情况描述:

我有一个使用了 同步+异步 的程序,其中同步程序是运行在主线程上的,异步是运行在一个子线程中的。
同步与异步独立运行,各司其职。但是有时候需要在同步中使用异步的函数或方法并取得结果。

回复
阅读 9.4k
1 个回答
✓ 已被采纳

你不需要循环调用 future.done(),用 future.result() 便可。

我建议把 eventloop 放在主线程,其它工作视类型可以放入

  1. 同(主)线程
    非阻塞(非CPU运算型)动作,例如 asyncio.sleep
  2. 从线程(池)
    阻塞(非CPU运算型)动作,例如 time.sleep
  3. 单独进程
    CPU运算型动作,例如计算质数

参考

https://docs.python.org/3/lib...
https://wiki.python.org/moin/...

例子

# -*- coding: utf-8 -*-
import asyncio
from datetime import datetime


async def add(a, b):
    await asyncio.sleep(1)
    return a + b


async def master_thread(loop):
    print("{} master: 1+2={}".format(datetime.now(), await add(1, 2)))


def slave_thread(loop):
    # 注意:这不是 coroutine 函数
    import time
    time.sleep(2)

    f = asyncio.run_coroutine_threadsafe(add(1, 2), loop)
    print("{} slave: 1+2={}".format(datetime.now(), f.result()))


async def main(loop):
    await asyncio.gather(
        master_thread(loop),
        # 线程池内执行
        loop.run_in_executor(None, slave_thread, loop),
    )


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏