异步实际上是如何工作的?

新手上路,请多包涵

这个问题的动机是我的另一个问题: How to await in cdef?

网络上有大量关于 asyncio 的文章和博客文章,但它们都很肤浅。我找不到任何有关 asyncio 实际实现方式以及使 I/O 异步的原因的任何信息。我试图阅读源代码,但它有数千行不是最高级别的 C 代码,其中很多都涉及辅助对象,但最关键的是,很难将 Python 语法与其将翻译的 C 代码联系起来进入。

Asycnio 自己的文档就更没用了。那里没有关于它如何工作的信息,只有一些关于如何使用它的指南,这些指南有时也具有误导性/写得非常糟糕。

我熟悉 Go 的协程实现,并且希望 Python 也能做同样的事情。如果是这样的话,我在上面链接的帖子中提出的代码就会起作用。既然没有,我现在正试图找出原因。到目前为止我最好的猜测如下,请纠正我错误的地方:

  1. 形式 async def foo(): ... 的过程定义实际上被解释为继承 coroutine 的类的方法。
  2. 也许, async def 实际上被 await 语句分成多个方法,其中调用这些方法的对象能够跟踪到目前为止执行的进度.
  3. 如果以上为真,那么协程的执行本质上归结为由某个全局管理器(循环?)调用协程对象的方法。
  4. 全局管理器以某种方式(如何?)知道 Python(仅?)代码何时执行 I/O 操作,并且能够在当前执行方法放弃控制(点击 await 声明)。

换句话说,这是我尝试将某些 asyncio 语法“脱糖”为更易于理解的内容:

 async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)

# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

如果我的猜测被证明是正确的:那我就有问题了。在这种情况下,I/O 实际上是如何发生的?在一个单独的线程中?整个解释器是否挂起并且 I/O 发生在解释器之外? I/O 到底是什么意思?如果我的 python 过程调用 C open() 过程,它又向内核发送中断,放弃控制权,Python 解释器如何知道这一点并能够继续运行其他代码,而内核代码却不知道实际的 I/O,直到它唤醒最初发送中断的 Python 程序?原则上,Python 解释器如何意识到这种情况的发生?

原文由 wvxvw 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 305
1 个回答

异步如何工作?

在回答这个问题之前,我们需要了解一些基本术语,如果您已经知道其中的任何一个,请跳过这些。

发电机

生成器是允许我们暂停 python 函数执行的对象。用户策划的生成器是使用关键字 yield 实现的。通过创建一个包含 yield 关键字的普通函数,我们将该函数转换为生成器:

 >>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

如您所见,在生成器上调用 next() 会导致解释器加载测试的框架,并返回 yield ed 值。再次调用 next() ,导致帧再次加载到解释器堆栈中,并继续 yield 另一个值。

到第三次 next() 被调用时,我们的生成器已经完成,并且 StopIteration 被抛出。

与生成器通信

生成器的一个鲜为人知的特性是您可以使用两种方法与它们通信: send()throw()

 >>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

在调用 gen.send() 时,该值作为 yield 关键字的返回值传递。

gen.throw() 另一方面,允许在生成器中抛出异常,异常在同一位置引发 yield 被调用。

从生成器返回值

从生成器返回一个值,导致该值被放入 StopIteration 异常中。我们稍后可以从异常中恢复值并将其用于我们的需要。

 >>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

看哪,一个新关键字: yield from

Python 3.4 添加了一个新关键字: yield from 。该关键字允许我们做的是传递任何 next()send()throw() 嵌套到一个内部生成器中。如果内部生成器返回一个值,它也是 yield from 的返回值:

 >>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

我写 了一篇文章 来进一步阐述这个话题。

把它们放在一起

在 Python 3.4 中引入新关键字 yield from 后,我们现在能够在生成器内部创建生成器,就像隧道一样,将数据从最内层的生成器来回传递到最外层的生成器。这催生了生成器的新含义—— _协程_。

协程 是可以在运行时停止和恢复的函数。在 Python 中,它们是使用 async def 关键字定义的。与生成器非常相似,它们也使用自己的形式 yield fromawait 。 Before async and await were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from instead of await )。

 async def inner():
    return 1

async def outer():
    await inner()

就像所有迭代器和生成器都实现了 __iter__() 方法一样,所有协程都实现了 __await__() 这允许它们每次都继续 await coro .

Python 文档 中有一个很好的 序列图,您应该查看一下。

在 asyncio 中,除了协程函数之外,我们还有 2 个重要的对象: tasksfutures

期货

Futures 是实现了 __await__() 方法的对象,它们的工作是保持一定的状态和结果。状态可以是以下之一:

  1. PENDING - 未来没有任何结果或异常集。
  2. 取消 - 未来被取消使用 fut.cancel()
  3. FINISHED - future 已完成,通过使用 — 的结果集或使用 fut.set_result() fut.set_exception() 异常集

正如您所猜测的那样,结果可以是将返回的 Python 对象,也可以是可能引发的异常。

future 对象的另一个 重要 特征是它们包含一个名为 add_done_callback() 的方法。此方法允许在任务完成后立即调用函数 - 无论它引发异常还是完成。

任务

Task 对象是特殊的 futures,它包裹着协程,并与最内层和最外层的协程进行通信。每次协程 await sa future 时,future 都会一直传递回任务(就像在 yield from 中一样),任务接收它。

接下来,任务将自己绑定到未来。它通过在未来调用 add_done_callback() 来实现。从现在开始,如果未来将被取消、传递异常或作为结果传递 Python 对象,则将调用任务的回调,并且它会重新存在。

异步

我们必须回答的最后一个紧迫问题是——IO 是如何实现的?

在 asyncio 的深处,我们有一个事件循环。任务的事件循环。事件循环的工作是在任务每次准备就绪时调用任务,并将所有这些工作协调到一台工作机器中。

事件循环的 IO 部分建立在一个名为 select 的关键函数之上。 Select 是一种阻塞函数,由底层操作系统实现,它允许在套接字上等待传入或传出数据。接收到数据后唤醒,并返回接收到数据的套接字,或准备好写入的套接字。

当您尝试通过 asyncio 通过套接字接收或发送数据时,下面实际发生的是首先检查套接字是否有任何可以立即读取或发送的数据。如果其 .send() 缓冲区已满,或者 .recv() 缓冲区为空,则套接字将注册到 select 中的一个函数(通过简单地将其添加到列表中的一个函数--- , rlist for recv and wlist for send ) and the appropriate function await sa newly created future 对象,绑定到那个套接字。

当所有可用任务都在等待 futures 时,事件循环调用 select 并等待。当其中一个套接字有传入数据或其 send 缓冲区耗尽时,asyncio 检查绑定到该套接字的未来对象,并将其设置为完成。

现在所有的魔法都发生了。未来将完成,之前添加自己的任务 add_done_callback() 恢复生机,并调用 .send() 在恢复最内层协程的协程上(因为 await chain) 然后你从它溢出到的附近缓冲区读取新接收的数据。

再次方法链,在 recv() 的情况下:

  1. select.select 等待。
  2. 准备好套接字,返回数据。
  3. 来自套接字的数据被移入缓冲区。
  4. future.set_result() 被调用。
  5. 添加了 add_done_callback() 的任务现在被唤醒。
  6. 任务在协程上调用 .send() 一直进入最内层的协程并将其唤醒。
  7. 数据正在从缓冲区中读取并返回给我们不起眼的用户。

总之,asyncio 使用生成器功能,允许暂停和恢复功能。它使用 yield from 允许从最内层生成器到最外层生成器来回传递数据的功能。它使用所有这些来在等待 IO 完成时停止函数执行(通过使用 OS select 函数)。

最好的是什么?当一个功能暂停时,另一个功能可能会运行并与精致的结构交错,这就是 asyncio。

原文由 Bharel 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题