关闭异步任务的正确方法

新手上路,请多包涵

我正在编写一个连接到 X 个 UNIX 套接字、发送命令并将输出保存在本地文件系统中的工具。它每 X 秒运行一次。为了在工具接收到终止信号时执行一些清理,我向 signal.SIGHUP 和 signal.SIGTERM 信号注册了一个函数(关闭)。此函数取消所有任务,然后关闭事件循环。

我的问题是我得到

RuntimeError:事件循环在 Future 完成之前停止

当我发送 signal.SIGTERM(kill ‘pid’) 时。我已经阅读了两次有关取消任务的文档,但我没有发现我在这里做错了什么。

我还注意到一些奇怪的事情,当我发送终止信号时程序处于睡眠模式并且我在日志中看到它唤醒了 pull_stats() 协程,您可以在日志的前两行中看到这一点。

日志:

 21:53:44,194 [23031] [MainThread:supervisor  ] DEBUG    **sleeping for 9.805s secs**
21:53:45,857 [23031] [MainThread:pull_stats  ] INFO     pull statistics
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,859 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     received stop signal, cancelling tasks...
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     stopping event loop
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     bye, exiting...
Traceback (most recent call last):
  File "./pull.py", line 249, in <module>
    main()
  File "./pull.py", line 245, in main
    supervisor(loop, config)
  File "./pull.py", line 161, in supervisor
    config['pull']['socket-dir'], storage_dir, loop))
  File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete
    raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.

这是代码:

 def shutdown(loop):
    LOGGER.info('received stop signal, cancelling tasks...')
    for task in asyncio.Task.all_tasks():
        LOGGER.info(task.cancel())
    LOGGER.info('stopping event loop')
    loop.stop()
    LOGGER.info('bye, exiting...')

def write_file(filename, data):
    try:
        with open(filename, 'w') as file_handle:
            file_handle.write(data.decode())
    except OSError as exc:
        return False
    else:
        return True

@asyncio.coroutine
def get(socket_file, cmd, storage_dir, loop):
    connect = asyncio.open_unix_connection(socket_file)
    reader, writer = yield from asyncio.wait_for(connect, 1)

    writer.write('{c}\n'.format(c=cmd).encode())
    data = yield from reader.read()
    writer.close()

    filename = os.path.basename(socket_file) + '_' + cmd.split()[1]
    filename = os.path.join(storage_dir, filename)
    result = yield from loop.run_in_executor(None, write_file, filename, data)

    return result

@asyncio.coroutine
def pull_stats(socket_dir, storage_dir, loop):
    socket_files = glob.glob(socket_dir + '/*sock*')
    coroutines = [get(socket_file, cmd, storage_dir, loop)
                  for socket_file in socket_files
                  for cmd in CMDS]
    status = yield from asyncio.gather(*coroutines)

    if len(set(status)) == 1 and True in set(status):
        return True
    else:
        return False

def supervisor(loop, config):
    dst_dir = config.get('pull', 'dst-dir')
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir')

    while True:
        start_time = int(time.time())
        storage_dir = os.path.join(tmp_dst_dir, str(start_time))

        try:
            os.makedirs(storage_dir)
        except OSError as exc:
            msg = "failed to create directory {d}:{e}".format(d=storage_dir,
                                                              e=exc)
            LOGGER.critical(msg)

        # Launch all connections.
        result = loop.run_until_complete(pull_stats(
            config['pull']['socket-dir'], storage_dir, loop))

        if result:
            try:
                shutil.move(storage_dir, dst_dir)
            except OSError as exc:
                LOGGER.critical("failed to move %s to %s: %s", storage_dir,
                                dst_dir, exc)
                break
            else:
                LOGGER.info('statistics are saved in %s', os.path.join(
                    dst_dir, os.path.basename(storage_dir)))
        else:
            LOGGER.critical('failed to pull stats')
            shutil.rmtree(storage_dir)

        sleep = config.getint('pull', 'pull-interval') - (time.time() -
                                                          start_time)
        if 0 < sleep < config.getint('pull', 'pull-interval'):
            time.sleep(sleep)
    loop.close()
    sys.exit(1)

def main():
    args = docopt(__doc__, version=VERSION)
    config = ConfigParser(interpolation=ExtendedInterpolation())
    config.read_dict(copy.copy(DEFAULT_OPTIONS))
    config.read(args['--file'])

    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))

    num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None)
    LOGGER.setLevel(num_level)

    supervisor(loop, config)

# This is the standard boilerplate that calls the main() function.
if __name__ == '__main__':
    main()

原文由 Pavlos Parissis 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 611
2 个回答

取消不是立即的,需要运行 ioloop 才能解决异常 CancelledError 。从关机中删除 ioloop.stop 并在主管中处理异常,以使事情正常进行。下面的简化示例。

重要的是,但是您可以取消 Task ,它只会停止监视/等待结束/结果并且循环不会为它处理进一步的事件。但是下面的请求/管道不会停止。

简化示例:

 import asyncio
import functools
import logging
import signal
import sys
from concurrent.futures import CancelledError

def shutdown(loop):
    logging.info('received stop signal, cancelling tasks...')
    for task in asyncio.Task.all_tasks():
        task.cancel()
    logging.info('bye, exiting in a minute...')

@asyncio.coroutine
def get(i):
    logging.info('sleep for %d', i)
    yield from asyncio.sleep(i)

@asyncio.coroutine
def pull_stats():
    coroutines = [get(i) for i in range(10,20)]
    status = yield from asyncio.gather(*coroutines)

def supervisor(loop):
    try:
        while True:
            result = loop.run_until_complete(pull_stats())
    except CancelledError:
        logging.info('CancelledError')
    loop.close()
    sys.exit(1)

def main():
    logging.getLogger().setLevel(logging.INFO)
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))
    supervisor(loop)

if __name__ == '__main__':
    main()

请注意,如果您仅取消 gather's 未来,所有子项也将被设置为取消。

还有睡觉的事

任何信号或中断的接收都会导致程序恢复执行。因此,当进程收到 SIGTERM 并设置处理程序时,python 允许您处理它,恢复此线程并调用 sighandler。由于 ioloop 的实现及其信号处理,它在唤醒后继续运行。

原文由 kwarunek 发布,翻译遵循 CC BY-SA 3.0 许可协议

由于信号在 Windows 上不可用,显示 loop.add_signal_handler(...) 的示例将无法在该平台上运行。下面,我提供了一个 Python 3 脚本框架,用于彻底关闭 asyncio Tasks ,它应该可以在任何平台上运行。

 import sys
import traceback
import asyncio
_verbose = ("--verbose" in sys.argv)
opts = None

# Main
async def main():
    if _verbose: print(f"main() called. opts: {opts}")

    print("... add code here ...")

# Initialization and Startup
def boot(argv):
    global opts
    if _verbose: print(f"boot() called. argv: {argv}")
    # process cli args here
    opts = {"opt1": "val1", "verbose": _verbose}

    # Call main
    try :
        asyncio.get_event_loop().run_until_complete(main())

    except Exception as e:
        print(f"Fatal exception: {_get_last_exc()}")
        _exit_program(1)

    if _verbose: print("Script exiting cleanly")
    _exit_program(0)

# Utilities
def _get_last_exc():
    exc_type, exc_value, exc_traceback = sys.exc_info()
    sTB = '\n'.join(traceback.format_tb(exc_traceback))
    return f"{exc_type}\n - msg: {exc_value}\n stack: {sTB}"

def _exit_program(code=1):
    if _verbose:
        print(f"_exit_program() called code = {code}")

    # kill all active asyncio Tasks
    if asyncio.Task:
        for task in asyncio.Task.all_tasks():
            if _verbose: print(f'cancelling task: {task}')
            try:
                task.cancel()
            except Exception as ex:
                if _verbose: print(f"Task kill failed: {_get_last_exc()}")
                pass

    # flush stderr and stdout
    sys.stdout.flush()
    sys.stderr.flush()

    # Shut down
    if _verbose: print(f"exiting with code {code}")
    sys.exit(code)

if __name__ == '__main__':
    boot(sys.argv[1:])

原文由 Timothy C. Quinn 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题