Python中asyncio模块的loop为什么可以被线程池共享?

根据官方描述,asyncio中的事件是属于单个线程的,下面这段程序中即属于main线程。但是为什么下面的ThreadPollExecutor(2)中的两个线程的能共享一个loop?

拜托大神解释一下事件循环的本质到底是什么?官方文档只提供了一系列的api,到现在我也并没有真正的理解。

import asyncio
from concurrent.futures import ThreadPoolExecutor

print('running async test')

def say_boo():
    i = 0
    while i < 10:
        print('...boo {0}'.format(i))
        i += 1


def say_baa():
    i = 0
    while i < 10:
        print('...baa {0}'.format(i))
        i += 1

if __name__ == "__main__":
    executor = ThreadPoolExecutor(2)
    loop = asyncio.get_event_loop()
    boo = asyncio.ensure_future(loop.run_in_executor(executor, say_boo))
    baa = asyncio.ensure_future(loop.run_in_executor(executor, say_baa))
阅读 6.6k
评论 2016-12-20 提问
    2 个回答
    fantix
    • 1.7k

    首先,event loop 就是一个普通 Python 对象,您可以通过 asyncio.new_event_loop() 创建无数个 event loop 对象。只不过,loop.run_xxx() 家族的函数都是阻塞的,比如 run_until_complete() 会等到给定的 coroutine 完成再结束,而 run_forever() 则会永远阻塞当前线程,直到有人停止了该 event loop 为止。所以在同一个线程里,两个 event loop 无法同时 run,但这不能阻止您用两个线程分别跑两个 event loop。

    其次再说 ThreadPoolExecutor。您也可以看到,它根本不是 asyncio 库的东西。当您创建一个 ThreadPoolExecutor 对象时,您实际上是创建了一个线程池。仅此而已,与 asyncio、event loop 并无瓜葛。而当您明确使用一个 event loop 的 run_in_executor() 方法时,其实底层做的只有两件事:

    1. 用线程池执行给定函数,与 asyncio 毫无关系;
    2. 给线程池执行结果增加一个回调,该回调会在 event loop 的下一次循环中保存执行结果。

    所以 run_in_executor() 只是将传统的线程池结果拉回到给定 event loop 中,以便进一步处理而已,不存在谁共享谁的关系,指定谁是谁。您可以尝试一下,在多个线程中跑多个 event loop,然后都向同一个线程池扔任务,然后返回结果:

    import asyncio
    import threading
    import time
    from concurrent.futures import ThreadPoolExecutor
    
    e = ThreadPoolExecutor()
    
    
    def worker(index):
        print(index, 'before:', time.time())
        time.sleep(1)
        print(index, 'after:', time.time())
        return index
    
    
    def main(index):
        loop = asyncio.new_event_loop()
        rv = loop.run_until_complete(loop.run_in_executor(e, worker, index))
        print('Thread', index, 'got result', rv)
    
    
    threads = []
    for i in range(5):
        t = threading.Thread(target=main, args=(i,))
        t.start()
        threads.append(t)
    
    for t in threads:
        t.join()
    

    结果大致如下:

    0 before: 1525751873.5256991
    1 before: 1525751873.526891
    3 before: 1525751873.527435
    4 before: 1525751873.5278442
    2 before: 1525751873.528244
    
    0 after: 1525751874.526666
    1 after: 1525751874.5270479
    Thread 1 got result 1
    Thread 0 got result 0
    3 after: 1525751874.532167
    2 after: 1525751874.532394
    4 after: 1525751874.5327559
    Thread 4 got result 4
    Thread 3 got result 3
    Thread 2 got result 2
    

    那为什么会有一个进程/线程一个 event loop 的说法呢?这是来源于默认 event loop 的概念,也就是 asyncio.get_event_loop()。初始情况下,get_event_loop() 只会在主线程帮您创建新的 event loop,并且在主线程中多次调用始终返回该 event loop;而在其他线程中调用 get_event_loop() 则会报错,除非您在这些线程里面手动调用过 set_event_loop()。细节请参考 文档

    最后关于您的问题“事件循环的本质到底是什么”:event loop 本身是一个循环,您可以看 asyncio.base_events.BaseEventLoop._run_once() 的源码,每个循环就执行这些东西。抛开所有的繁杂,每次循环只做两件事:

    1. 干等,什么也不做,一直等到有事件发生;
    2. 调用之前注册在这个事件上的处理代码。

    仅此而已。这里的事件主要包括定时器事件和 I/O 事件,所有跑在 event loop 上的您的代码都是由一个事件触发的,然后反复地交错地跑,宏观上看就是异步并发了。

    评论 赞赏