Java 8 并行流中的自定义线程池

新手上路,请多包涵

是否可以为 Java 8 并行流 指定自定义线程池?我在任何地方都找不到它。

想象一下,我有一个服务器应用程序,我想使用并行流。但是这个应用程序很大而且是多线程的,所以我想把它分开。我不希望在另一个模块的应用程序块任务的一个模块中运行缓慢的任务。

如果我不能为不同的模块使用不同的线程池,这意味着我不能在大多数现实世界的情况下安全地使用并行流。

试试下面的例子。有一些 CPU 密集型任务在单独的线程中执行。这些任务利用并行流。第一个任务被破坏了,所以每一步需要 1 秒(通过线程睡眠模拟)。问题是其他线程卡住并等待中断的任务完成。这是一个人为的例子,但想象一个 servlet 应用程序和某人向共享分叉连接池提交一个长时间运行的任务。

 public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));

        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

原文由 Lukas 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 863
2 个回答

实际上有一个技巧如何在特定的 fork-join 池中执行并行操作。如果您将它作为一个任务在一个 fork-join 池中执行,它会停留在那里并且不使用公共池。

 final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

该技巧基于 ForkJoinTask.fork 指定:“安排在当前任务正在运行的池中异步执行此任务(如果适用),或者使用 ForkJoinPool.commonPool() 如果不是 inForkJoinPool()

原文由 Lukas 发布,翻译遵循 CC BY-SA 4.0 许可协议

并行流使用默认的 ForkJoinPool.commonPool 默认情况下,它的线程数比你拥有的处理器少一个,由 Runtime.getRuntime().availableProcessors() 返回(这意味着并行流为调用线程留下一个处理器)。

对于需要单独或自定义池的应用程序,可以使用给定的目标并行级别构建 ForkJoinPool;默认情况下,等于可用处理器的数量。

这也意味着如果您有嵌套的并行流或同时启动的多个并行流,它们将 共享 同一个池。优点:您永远不会使用超过默认值(可用处理器的数量)。缺点:您可能无法将“所有处理器”分配给您启动的每个并行流(如果您碰巧有多个处理器)。 (显然你可以使用 ManagedBlocker 来规避它。)

要更改并行流的执行方式,您可以

  • 将并行流执行提交到您自己的 ForkJoinPool: yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
  • 您可以使用系统属性更改公共池的大小: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20") 目标并行度为 20 个线程。

后者在我的机器上有 8 个处理器的例子。如果我运行以下程序:

 long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

输出是:

215 216 216 216 216 216 216 216 315 316 316 316 316 316 316 316 415 416 416 416

所以你可以看到并行流一次处理了8个item,即它使用了8个线程。但是,如果我取消注释注释行,则输出为:

215 215 215 215 215 216 216 216 216 216 216 216 216 216 216 216 216 216 216 216

这一次,并行流使用了 20 个线程,并且流中的所有 20 个元素都已并发处理。

原文由 assylias 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题