2

之前写协程都是直接把future放到eventloop里执行run_until_complete,前段时间面试的时候一个面试官问我如何调度协程,我完全懵比。请问大家是怎样调度的?

2018-09-19 提问
1 个回答
3

已采纳

汉语词典释义,调度为管理并安排。对于一个 coroutine,调度不仅仅是 run_until_complete——这只是安排了立即启动一个 coroutine 并等待直到其完成;而在 coroutine 的执行过程中,您自己并没有管理或安排这个 coroutine 的具体内容。面试官希望您回答的,则是 event loop 接管 coroutine 之后,对其调度的方法。

对于 Python 来说,狭义上的 coroutine 是指一个 coroutine 实例,通常是调用一个 async def 函数的返回值,比如:

async def main():
    await asyncio.sleep(1)
    return 123

coro = main()

这里的 coro 并不是 123,而是一个 coroutine 实例。此时,main() 函数也并没有开始执行。为了得到结果,一般会这么做:

loop = asyncio.get_event_loop()
rv = loop.run_until_complete(coro)

这时,rv 的值就会在 1 秒钟之后变成 123run_until_complete 做的事情详解如下:

  1. 调用 ensure_future 来确保参数 coro 是一个 Future 实例,或者如同这里的情况,创建一个 Task 将其转换为一个 Future 实例;
  2. Task 实例在创建的过程中,会调用 loop.call_soon(self._step),在 event loop 的 _ready 回调队列中添加一个 Handle 实例;
  3. 为新创建的 Future 实例(即 Task 实例)添加一个 done callback 来停止 event loop;
  4. 执行 loop.run_forever()
  5. loop.run_forever() 本质是一个 while 循环(所谓 event loop 的循环就是这个),只要 event loop 没有被标记为停止,就会反复调用 self._run_once()
  6. _run_once() 中,event loop 会将 _ready 回调队列中所有的 Handle 实例依次取出并执行,这里就执行到了前面 Task 实例的 _step()
  7. Task._step() 则会调用其绑定的 coro 实例的 send(None) 方法,开始执行我们定义的 main() 函数;
  8. 如上述代码所示,main() 会调用 asyncio.sleep(1) 创建一个 Future 实例;
  9. asyncio.sleep(1) 会调用 loop.call_later(),安排 1 秒后调用该 Future 实例的 set_result()
  10. call_later() 中,event loop 会创建一个 TimerHandle 并将其添加到一个以计划执行时间为序的优先队列 _scheduled 中;
  11. 回到 main(),其中的 await 则会将这个请求了睡 1 秒的 Future 实例返回给前面的 send() 调用;
  12. Task._step() 发现收到了一个 Future 实例,立即调用其 add_done_callback(),注册将来继续执行的步骤;
  13. 回到 event loop 主循环,继续执行下一次 loop._run_once()
  14. 这次 _ready 队列虽为空,但 _scheduled 优先队列里是有东西的。Event loop 会检查最近计划执行的 TimerHandle 是否到时间该执行了——很显然我们刚安排下去 1 秒后才执行,所以不会立即执行,而是计算一下应该等多长时间再去执行(因为安排之后又跑了别的代码,所以会比 1 秒稍短一些);
  15. 调用 selector.select(),并以上述时间为超时参数。因为没有注册其他 I/O 事件,所以该调用会阻塞将近 1 秒钟;
  16. 第三次执行 loop._run_once(),发现有一个 TimerHandle 到时间了,立即执行;
  17. 翻阅一下前面的步骤,这个 TimerHandle 会调用 sleep 产生的 Future 实例的 set_result()
  18. 这个 Future 实例一旦有了结果,又会接着触发执行之前注册过的 done callback,也就是恢复 Task 的执行;
  19. 在新的 Task._step() 调用中,Task 会再次调用 coro.send(),将 main() 中剩下的 return 123 执行完;
  20. 这里返回的结果 123 会被 Task 用作调用 self.set_result(123),毕竟 Task 就是最早的那个 Future 实例;
  21. 回看步骤 3,这个 Future 实例曾经被 run_until_complete 添加过一个 done callback,作用是停止 event loop。所以,此时 event loop 会被置为待停止状态;
  22. TimerHandle 的执行完成,进入下一次主循环,但因 event loop 已经被置为待停止,因此主循环因条件不满足而结束,至此 loop.run_forever() 返回;
  23. 回到最初的 run_until_complete(),它会把 future.result() 返回,也就是 123,最终赋值给 rv

简单总结的话,event loop 对 callback 的调度方式是抢占式的:_scheduled 优先队列中如果有到时间的就依次优先执行,然后还是以先到先得的方式执行 I/O 事件回调,最后只要 _ready 队列中有回调就统统执行,以此往复直至有人主动停止了 event loop。最后,一个 coroutine 则是由 Task 驱动的多次回调完成执行,分割点则是 await;对于多个并发的 coroutine,宏观来看调度方法就是“谁到时间了谁先执行一段,或者谁的 I/O 事件有进展了谁先执行一段,再不就是谁先注册的谁先执行”。

撰写答案

推广链接