在 Java 中并行化任务的最简单方法是什么?

新手上路,请多包涵

假设我有这样的任务:

 for(Object object: objects) {
    Result result = compute(object);
    list.add(result);
}

并行化每个 compute() 的最简单方法是什么(假设它们已经可以并行化)?

我不需要与上面的代码严格匹配的答案,只是一个一般性的答案。但如果您需要更多信息:我的任务是 IO 绑定的,这是针对 Spring Web 应用程序的,任务将在 HTTP 请求中执行。

原文由 Eduardo 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 386
2 个回答

我建议看一下 ExecutorService

特别是,像这样:

 ExecutorService EXEC = Executors.newCachedThreadPool();
List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
for (final Object object: objects) {
    Callable<Result> c = new Callable<Result>() {
        @Override
        public Result call() throws Exception {
            return compute(object);
        }
    };
    tasks.add(c);
}
List<Future<Result>> results = EXEC.invokeAll(tasks);

请注意,如果 objects 是一个大列表,则使用 newCachedThreadPool 可能会很糟糕。缓存的线程池可以为每个任务创建一个线程!您可能想要使用 newFixedThreadPool(n) 其中 n 是合理的(比如您拥有的内核数量,假设 compute() 受 CPU 限制)。

这是实际运行的完整代码:

 import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ExecutorServiceExample {
    private static final Random PRNG = new Random();

    private static class Result {
        private final int wait;
        public Result(int code) {
            this.wait = code;
        }
    }

    public static Result compute(Object obj) throws InterruptedException {
        int wait = PRNG.nextInt(3000);
        Thread.sleep(wait);
        return new Result(wait);
    }

    public static void main(String[] args) throws InterruptedException,
        ExecutionException {
        List<Object> objects = new ArrayList<Object>();
        for (int i = 0; i < 100; i++) {
            objects.add(new Object());
        }

        List<Callable<Result>> tasks = new ArrayList<Callable<Result>>();
        for (final Object object : objects) {
            Callable<Result> c = new Callable<Result>() {
                @Override
                public Result call() throws Exception {
                    return compute(object);
                }
            };
            tasks.add(c);
        }

        ExecutorService exec = Executors.newCachedThreadPool();
        // some other exectuors you could try to see the different behaviours
        // ExecutorService exec = Executors.newFixedThreadPool(3);
        // ExecutorService exec = Executors.newSingleThreadExecutor();
        try {
            long start = System.currentTimeMillis();
            List<Future<Result>> results = exec.invokeAll(tasks);
            int sum = 0;
            for (Future<Result> fr : results) {
                sum += fr.get().wait;
                System.out.println(String.format("Task waited %d ms",
                    fr.get().wait));
            }
            long elapsed = System.currentTimeMillis() - start;
            System.out.println(String.format("Elapsed time: %d ms", elapsed));
            System.out.println(String.format("... but compute tasks waited for total of %d ms; speed-up of %.2fx", sum, sum / (elapsed * 1d)));
        } finally {
            exec.shutdown();
        }
    }
}

原文由 overthink 发布,翻译遵循 CC BY-SA 2.5 许可协议

使用 Java8 及更高版本,您可以在集合上使用 parallelStream 来实现此目的:

 List<T> objects = ...;

List<Result> result = objects.parallelStream().map(object -> {
            return compute(object);
        }).collect(Collectors.toList());

注意:结果列表的顺序可能与对象列表中的顺序不匹配。

有关如何设置正确线程数的详细信息,请参阅此 stackoverflow 问题 how-many-threads-are-spawned-in-parallelstream-in-java-8

原文由 i000174 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题