请解释“任务已被销毁但正在等待处理!”取消任务后

新手上路,请多包涵

我正在使用 Python 3.4.2 学习 asyncio,我用它来连续监听 IPC 总线,而 gbulb 则监听 DBus。

我创建了一个函数 listen_to_ipc_channel_layer 持续侦听 IPC 通道上的传入消息并将消息传递给 message_handler

我也在听 SIGTERM 和 SIGINT。当我向运行您在底部找到的代码的 python 进程发送 SIGTERM 时,脚本应该正常终止。

我遇到的问题是以下警告:

 got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>

Process finished with exit code 0

…使用以下代码:

 import asyncio
import gbulb
import signal
import asgi_ipc as asgi

def main():
    asyncio.async(listen_to_ipc_channel_layer())
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    # Start listening on the Linux IPC bus for incoming messages
    loop.run_forever()
    loop.close()

@asyncio.coroutine
def listen_to_ipc_channel_layer():
    """Listens to the Linux IPC bus for messages"""
    while True:
        message_handler(message=channel_layer.receive(["my_channel"]))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break

def ask_exit():
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()

if __name__ == "__main__":
    gbulb.install()
    # Connect to the IPC bus
    channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
    main()

我仍然对异步知之甚少,但我想我知道发生了什么。在等待 yield from asyncio.sleep(0.1) 时,信号处理程序捕获了 SIGTERM 并在该过程中调用 task.cancel()

这不应该在 while True: 循环中触发 CancelledError 吗? (因为它不是,但这就是我理解 “调用 cancel() 将向包装的协同程序抛出 CancelledError”的方式)。

最终 loop.stop() 被调用停止循环而不等待 yield from asyncio.sleep(0.1) 返回结果甚至整个协程 listen_to_ipc_channel_layer

如果我错了,请纠正我。

我认为我唯一需要做的就是让我的程序等待 yield from asyncio.sleep(0.1) 返回结果 和/或 协程以打破 while 循环并完成。

我相信我混淆了很多东西。请帮我弄清楚这些事情,这样我就可以弄清楚如何在没有警告的情况下优雅地关闭事件循环。

原文由 Daniel 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1k
2 个回答

问题来自取消任务后立即关闭循环。正如 cancel() 文档 所述

“这安排在事件循环 的下一个周期 将 CancelledError 抛入包装协程中。”

拿这段代码:

 import asyncio
import signal

async def pending_doom():
    await asyncio.sleep(2)
    print(">> Cancelling tasks now")
    for task in asyncio.Task.all_tasks():
        task.cancel()

    print(">> Done cancelling tasks")
    asyncio.get_event_loop().stop()

def ask_exit():
    for task in asyncio.Task.all_tasks():
        task.cancel()

async def looping_coro():
    print("Executing coroutine")
    while True:
        try:
            await asyncio.sleep(0.25)
        except asyncio.CancelledError:
            print("Got CancelledError")
            break

        print("Done waiting")

    print("Done executing coroutine")
    asyncio.get_event_loop().stop()

def main():
    asyncio.async(pending_doom())
    asyncio.async(looping_coro())

    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    loop.run_forever()

    # I had to manually remove the handlers to
    # avoid an exception on BaseEventLoop.__del__
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.remove_signal_handler(sig)

if __name__ == '__main__':
    main()

注意 ask_exit 取消任务但不 stop 循环,在下一个周期 looping_coro() 停止它。如果你取消它的输出是:

 Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine

注意 pending_doom 如何 在 之后立即 取消和停止循环。如果你让它运行直到 pending_doom 协程从睡眠中醒来,你会看到同样的警告:

 Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>

原文由 Yeray Diaz 发布,翻译遵循 CC BY-SA 3.0 许可协议

这个问题的意思是一个循环没有时间完成所有的任务。

这安排在事件循环的下一个周期将 CancelledError 抛入包装的协程中。

在您的方法中没有机会执行循环的“下一个循环”。为了使其正确,您应该将停止操作移动到一个单独的非循环协程,以使您的循环有机会完成。

第二个重要的事情是 CancelledError 提高。

与 Future.cancel() 不同,这不能保证任务会被取消:异常可能会被捕获并采取行动,延迟任务的取消或完全阻止取消。该任务还可以返回一个值或引发不同的异常。

调用此方法后,cancelled() 不会立即返回 True(除非任务已被取消)。当包装的协程以 CancelledError 异常终止时,任务将被标记为已取消(即使未调用 cancel())。

因此,在清理之后,您的协程必须引发 CancelledError 以标记为已取消。

使用额外的协程来停止循环不是问题,因为它不是循环的并且在执行后立即完成。

 def main():
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(listen_to_ipc_channel_layer())

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)
    loop.run_forever()
    print("Close")
    loop.close()


@asyncio.coroutine
def listen_to_ipc_channel_layer():
    while True:
        try:
            print("Running")
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError as e:
            print("Break it out")
            raise e # Raise a proper error


# Stop the loop concurrently
@asyncio.coroutine
def exit():
    loop = asyncio.get_event_loop()
    print("Stop")
    loop.stop()

def ask_exit():
    for task in asyncio.Task.all_tasks():
        task.cancel()
    asyncio.ensure_future(exit())


if __name__ == "__main__":
    main()

原文由 I159 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏