如何让 ThreadPoolExecutor 在排队之前将线程增加到最大?

新手上路,请多包涵

一段时间以来,我一直对 ThreadPoolExecutor 的默认行为感到沮丧,它支持我们很多人使用的 ExecutorService 线程池。引用 Javadocs:

如果运行的线程多于 corePoolSize 但少于 maximumPoolSize ,则 只有在队列已满 时才会创建新线程。

这意味着如果您使用以下代码定义线程池,它将 永远不会 启动第二个线程,因为 LinkedBlockingQueue 是无界的。

 ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue*/));

只有当你有一个 有界 队列并且队列 已满 时,核心数以上的任何线程才会启动。我怀疑大量初级 Java 多线程程序员没有意识到 ThreadPoolExecutor 的这种行为。

现在我有特定的用例,其中这不是最佳的。我正在寻找解决方法,无需编写自己的 TPE 类。

我的要求是针对可能不可靠的第 3 方进行回调的 Web 服务。

  • 我不想与网络请求同步进行回调,所以我想使用线程池。
  • 我通常每分钟得到几个这样的线程,所以我不想有一个 newFixedThreadPool(...) 有大量线程,这些线程大多处于休眠状态。
  • 每隔一段时间,我就会遇到这种流量的爆发,我想将线程数增加到某个最大值(比如 50)。
  • 我需要尽 最大 努力完成所有回调,因此我想将任何超过 50 个的回调排队。我不想使用 newCachedThreadPool() 压倒我的网络服务器的其余部分。

我如何解决 ThreadPoolExecutor 中的这个限制 --- 在启动更多线程 之前 队列需要被限制和填满?我怎样才能让它在排队任务 之前 启动更多线程?

编辑:

@Flavio 在使用 ThreadPoolExecutor.allowCoreThreadTimeOut(true) 使核心线程超时并退出时提出了一个很好的观点。我考虑过,但我仍然想要核心线程功能。如果可能的话,我不希望池中的线程数低于核心大小。

原文由 Gray 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 582
2 个回答

我如何解决 ThreadPoolExecutor 中的这个限制,在启动更多线程之前,队列需要被限制并填满。

我相信我终于通过 ThreadPoolExecutor 找到了一个有点优雅(也许有点老套)的解决方案来解决这个限制。它涉及扩展 LinkedBlockingQueue 让它返回 false queue.offer(...) 当已经有一些任务排队时—。如果当前线程跟不上排队的任务,TPE 将添加额外的线程。如果池已经达到最大线程数,则将调用 RejectedExecutionHandlerput(...) 放入队列。

写一个队列肯定很奇怪,其中 offer(...) 可以返回 falseput() 从不阻塞,所以这就是 hack 部分。但这很适合 TPE 对队列的使用,因此我认为这样做没有任何问题。

这是代码:

 // extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

使用这种机制,当我向队列提交任务时, ThreadPoolExecutor 将:

  1. 最初将线程数扩展到核心大小(此处为 1)。
  2. 将其提供给队列。如果队列为空,它将排队等待现有线程处理。
  3. 如果队列已经有 1 个或多个元素, offer(...) 将返回 false。
  4. 如果返回 false,则增加池中的线程数,直到它们达到最大数量(此处为 50)。
  5. 如果达到最大值,则它调用 RejectedExecutionHandler
  6. RejectedExecutionHandler 然后将任务放入队列中,以 FIFO 顺序由第一个可用线程处理。

虽然在我上面的示例代码中,队列是无界的,但您也可以将其定义为有界队列。例如,如果将 1000 的容量添加到 LinkedBlockingQueue ,那么它将:

  1. 将线程扩展到最大
  2. 然后排队直到填满 1000 个任务
  3. 然后阻塞调用者直到队列有可用空间。

Also, if you needed to use offer(...) in the RejectedExecutionHandler then you could use the offer(E, long, TimeUnit) method instead with Long.MAX_VALUE as the timeout.

警告:

如果您希望在执行程序关闭 后将 任务添加到执行程序,那么您可能希望更聪明地抛出 RejectedExecutionException 我们的自定义 RejectedExecutionHandler 当执行程序服务已关闭时关闭。感谢@RaduToader 指出这一点。

编辑:

对该答案的另一个调整可能是询问 TPE 是否有空闲线程,并且仅在有空闲线程时才将项目入队。您必须为此创建一个真正的类并在其上添加 ourQueue.setThreadPoolExecutor(tpe); 方法。

那么您的 offer(...) 方法可能类似于:

  1. 检查是否 tpe.getPoolSize() == tpe.getMaximumPoolSize() 在这种情况下只需调用 super.offer(...)
  2. 否则如果 tpe.getPoolSize() > tpe.getActiveCount() 然后调用 super.offer(...) 因为似乎有空闲线程。
  3. 否则返回 false 以分叉另一个线程。

也许这个:

 int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

请注意,TPE 上的 get 方法非常昂贵,因为它们访问 volatile 字段或(在 getActiveCount() 的情况下)锁定 TPE 并遍历线程列表。此外,这里存在竞争条件,可能导致任务不正确地排队或在有空闲线程时分叉另一个线程。

原文由 Gray 发布,翻译遵循 CC BY-SA 4.0 许可协议

关于这个问题,我已经得到了另外两个答案,但我怀疑这个是最好的。

它基于 当前接受的答案 的技术,即:

  1. 覆盖队列的 offer() 方法以(有时)返回 false,
  2. 这导致 ThreadPoolExecutor 生成新线程或拒绝任务,并且
  3. RejectedExecutionHandler 设置为在拒绝时 实际 排队任务。

问题是 offer() 应该返回 false。当队列上有几个任务时,当前接受的答案返回 false,但正如我在那里的评论中指出的那样,这会导致不良影响。或者,如果您始终返回 false,即使队列中有线程在等待,您也会继续生成新线程。

解决方案是使用 Java 7 LinkedTransferQueue 并有 offer() 调用 tryTransfer() 。当有一个等待的消费者线程时,任务将被传递给该线程。否则, offer() 将返回 false 并且 ThreadPoolExecutor 将产生一个新线程。

     BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });

原文由 Robert Tupelo-Schneck 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题