一段时间以来,我一直对 ThreadPoolExecutor
的默认行为感到沮丧,它支持我们很多人使用的 ExecutorService
线程池。引用 Javadocs:
如果运行的线程多于 corePoolSize 但少于 maximumPoolSize ,则 只有在队列已满 时才会创建新线程。
这意味着如果您使用以下代码定义线程池,它将 永远不会 启动第二个线程,因为 LinkedBlockingQueue
是无界的。
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue*/));
只有当你有一个 有界 队列并且队列 已满 时,核心数以上的任何线程才会启动。我怀疑大量初级 Java 多线程程序员没有意识到 ThreadPoolExecutor
的这种行为。
现在我有特定的用例,其中这不是最佳的。我正在寻找解决方法,无需编写自己的 TPE 类。
我的要求是针对可能不可靠的第 3 方进行回调的 Web 服务。
- 我不想与网络请求同步进行回调,所以我想使用线程池。
- 我通常每分钟得到几个这样的线程,所以我不想有一个
newFixedThreadPool(...)
有大量线程,这些线程大多处于休眠状态。 - 每隔一段时间,我就会遇到这种流量的爆发,我想将线程数增加到某个最大值(比如 50)。
- 我需要尽 最大 努力完成所有回调,因此我想将任何超过 50 个的回调排队。我不想使用
newCachedThreadPool()
压倒我的网络服务器的其余部分。
我如何解决 ThreadPoolExecutor
中的这个限制 --- 在启动更多线程 之前 队列需要被限制和填满?我怎样才能让它在排队任务 之前 启动更多线程?
编辑:
@Flavio 在使用 ThreadPoolExecutor.allowCoreThreadTimeOut(true)
使核心线程超时并退出时提出了一个很好的观点。我考虑过,但我仍然想要核心线程功能。如果可能的话,我不希望池中的线程数低于核心大小。
原文由 Gray 发布,翻译遵循 CC BY-SA 4.0 许可协议
我相信我终于通过
ThreadPoolExecutor
找到了一个有点优雅(也许有点老套)的解决方案来解决这个限制。它涉及扩展LinkedBlockingQueue
让它返回false
queue.offer(...)
当已经有一些任务排队时—。如果当前线程跟不上排队的任务,TPE 将添加额外的线程。如果池已经达到最大线程数,则将调用RejectedExecutionHandler
将put(...)
放入队列。写一个队列肯定很奇怪,其中
offer(...)
可以返回false
和put()
从不阻塞,所以这就是 hack 部分。但这很适合 TPE 对队列的使用,因此我认为这样做没有任何问题。这是代码:
使用这种机制,当我向队列提交任务时,
ThreadPoolExecutor
将:offer(...)
将返回 false。RejectedExecutionHandler
RejectedExecutionHandler
然后将任务放入队列中,以 FIFO 顺序由第一个可用线程处理。虽然在我上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果将 1000 的容量添加到
LinkedBlockingQueue
,那么它将:Also, if you needed to use
offer(...)
in theRejectedExecutionHandler
then you could use theoffer(E, long, TimeUnit)
method instead withLong.MAX_VALUE
as the timeout.警告:
如果您希望在执行程序关闭 后将 任务添加到执行程序,那么您可能希望更聪明地抛出
RejectedExecutionException
我们的自定义RejectedExecutionHandler
当执行程序服务已关闭时关闭。感谢@RaduToader 指出这一点。编辑:
对该答案的另一个调整可能是询问 TPE 是否有空闲线程,并且仅在有空闲线程时才将项目入队。您必须为此创建一个真正的类并在其上添加
ourQueue.setThreadPoolExecutor(tpe);
方法。那么您的
offer(...)
方法可能类似于:tpe.getPoolSize() == tpe.getMaximumPoolSize()
在这种情况下只需调用super.offer(...)
。tpe.getPoolSize() > tpe.getActiveCount()
然后调用super.offer(...)
因为似乎有空闲线程。false
以分叉另一个线程。也许这个:
请注意,TPE 上的 get 方法非常昂贵,因为它们访问
volatile
字段或(在getActiveCount()
的情况下)锁定 TPE 并遍历线程列表。此外,这里存在竞争条件,可能导致任务不正确地排队或在有空闲线程时分叉另一个线程。