Python 的线程池的回调函数是在主线程还是工作线程中运行的?

Python 的线程池回调函数是在主线程还是工作线程中运行的?

from concurrent.futures import ThreadPoolExecutor
import threading


def action():
    return ''


def callback_func(future):
    print(threading.current_thread().name)
    # assert 'ThreadPoolExecutor' in threading.current_thread().name
    # assert 'MainThread' in threading.current_thread().name


pool = ThreadPoolExecutor(max_workers=10)

for i in range(10):
    future = pool.submit(action)
    future.add_done_callback(callback_func)

输出如下:

MainThread
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_1
MainThread
ThreadPoolExecutor-0_3
ThreadPoolExecutor-0_3
ThreadPoolExecutor-0_3
MainThread
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_1

print 的结果来看,有一些是在 MainThread 中,有一些是在 ThreadPoolExecutor 执行 callback_func 函数

这这这??????

如果加点时延,又全在 ThreadPoolExecutor 中执行回调函数了:

from concurrent.futures import ThreadPoolExecutor
import threading
import time


def action():
    time.sleep(1)
    return ''


def callback_func(future):
    print(threading.current_thread().name)
    # assert 'ThreadPoolExecutor' in threading.current_thread().name
    # assert 'MainThread' in threading.current_thread().name


pool = ThreadPoolExecutor(max_workers=10)

for i in range(10):
    future = pool.submit(action)
    future.add_done_callback(callback_func)

输出如下:

ThreadPoolExecutor-0_5
ThreadPoolExecutor-0_0
ThreadPoolExecutor-0_7ThreadPoolExecutor-0_3
ThreadPoolExecutor-0_6
ThreadPoolExecutor-0_1
ThreadPoolExecutor-0_9

ThreadPoolExecutor-0_8
ThreadPoolExecutor-0_2
ThreadPoolExecutor-0_4
阅读 3.3k
1 个回答

python 中是 master thread 执行 callback 还是 work thread 负责执行 callback?

我们先来看看在 python 的线程池中是如何添加回调函数的吧!

from concurrent.futures import ThreadPoolExecutor
from loguru import logger
import requests
import time

pool = ThreadPoolExecutor(max_workers=10)


def func_get():
    pass


def func_callback():
    pass


for i in range(100):
    feature = pool.submit(func_get)
    feature.add_done_callback(func_callback)

可以看到:

  • 先 submit
  • 再 add_done_callback

这就有一个问题,如果 func_get 执行很快很快,还没来的及 add_done_callback 就执行完成,那这个时候 add_done_callback(func_callback) ,这个 func_callback 还会被执行吗?

答案是会得!

为什么呢?

一起来看看线程池的源码吧!

python 的线程池完全是纯python实现的,没有使用 c,可以放心阅读

使用 submit 之后,会返回一个 Future 实例
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/thread.py

def submit(self, fn, /, *args, **kwargs):
    with self._shutdown_lock, _global_shutdown_lock:
        if self._broken:
            raise BrokenThreadPool(self._broken)

        if self._shutdown:
            raise RuntimeError('cannot schedule new futures after shutdown')
        if _shutdown:
            raise RuntimeError('cannot schedule new futures after '
                                'interpreter shutdown')

        f = _base.Future()
        w = _WorkItem(f, fn, args, kwargs)

        self._work_queue.put(w)
        self._adjust_thread_count()
        return f

随后调用 Future 实例的 add_done_callback 方法
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py

def add_done_callback(self, fn):
    """Attaches a callable that will be called when the future finishes.

    Args:
        fn: A callable that will be called with this future as its only
            argument when the future completes or is cancelled. The callable
            will always be called by a thread in the same process in which
            it was added. If the future has already completed or been
            cancelled then the callable will be called immediately. These
            callables are called in the order that they were added.
    """
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)
            return
    try:
        fn(self)
    except Exception:
        LOGGER.exception('exception calling callback for %r', self)

可以看到,很简单:

  • 如果调用 add_done_callback 的时候,这个 future 的 _state 已经是 CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED 这些状态中的任何一个了, 就说明线程池已经把这个任务做完了,这个时候,主线程会去执行 callback 任务。
  • 如果调用 add_done_callback 的时候,future 还没有 done,就会把 call_back 添加到 call_back 列表中。

call_back 列表列表中的任务,会在 work thread 干完 feature 对应的正事之后执行,这个时候是在 workthread 中执行的。
/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py

def _invoke_callbacks(self):
    for callback in self._done_callbacks:
        try:
            callback(self)
        except Exception:
            LOGGER.exception('exception calling callback for %r', self)

_invoke_callbacks 会在 cancelset_resultset_exception 这三中情况末尾执行

/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/concurrent/futures/_base.py

def cancel(self):
    """Cancel the future if possible.

    Returns True if the future was cancelled, False otherwise. A future
    cannot be cancelled if it is running or has already completed.
    """
    with self._condition:
        if self._state in [RUNNING, FINISHED]:
            return False

        if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
            return True

        self._state = CANCELLED
        self._condition.notify_all()

    self._invoke_callbacks()
    return True

def set_result(self, result):
    """Sets the return value of work associated with the future.

    Should only be used by Executor implementations and unit tests.
    """
    with self._condition:
        if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._result = result
        self._state = FINISHED
        for waiter in self._waiters:
            waiter.add_result(self)
        self._condition.notify_all()
    self._invoke_callbacks()

def set_exception(self, exception):
    """Sets the result of the future as being the given exception.

    Should only be used by Executor implementations and unit tests.
    """
    with self._condition:
        if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED}:
            raise InvalidStateError('{}: {!r}'.format(self._state, self))
        self._exception = exception
        self._state = FINISHED
        for waiter in self._waiters:
            waiter.add_exception(self)
        self._condition.notify_all()
    self._invoke_callbacks()

总结:

  • 调用 add_done_callback 时,task 已经完成,则在 master thread 中执行 callback
  • 否则由 work thread 自行执行 callback

建议:
所以要小心,不要在 callback 中执行可能会导致长时间阻塞的任务

推荐问题
宣传栏