循环中的 CompletableFuture:如何收集所有响应并处理错误

新手上路,请多包涵

我正在尝试为 PUT 循环请求调用 rest api。每个调用都是 CompletableFuture 。每个 api 调用都会返回一个类型为 RoomTypes.RoomType 的对象

  • 我想在不同的列表中收集响应(包括成功响应和错误响应)。我该如何实现?我确定我不能使用 allOf 因为如果任何一个调用无法更新,它就不会获得所有结果。

  • 如何记录每次调用的错误/异常?

 public void sendRequestsAsync(Map<Integer, List> map1) {
    List<CompletableFuture<Void>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
    List<RoomTypes.RoomType> responses = new ArrayList<>(); //List for responses
    ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    for (Map.Entry<Integer, List> entry :map1.entrySet()) {
        CompletableFuture requestCompletableFuture = CompletableFuture
                .supplyAsync(
                        () ->
            //API call which returns object of type RoomTypes.RoomType
            updateService.updateRoom(51,33,759,entry.getKey(),
                           new RoomTypes.RoomType(entry.getKey(),map2.get(entry.getKey()),
                                    entry.getValue())),
                    yourOwnExecutor
            )//Supply the task you wanna run, in your case http request
            .thenApply(responses::add);

    completableFutures.add(requestCompletableFuture);
}

原文由 Rudrani Angira 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.3k
2 个回答

您可以简单地使用 allOf() 获得一个在所有初始期货完成(例外或未完成)时完成的未来,然后使用 Collectors.partitioningBy() 在成功和失败之间拆分它们:

 List<CompletableFuture<RoomTypes.RoomType>> completableFutures = new ArrayList<>(); //List to hold all the completable futures
ExecutorService yourOwnExecutor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

for (Map.Entry<Integer, List> entry : map1.entrySet()) {
    CompletableFuture<RoomTypes.RoomType> requestCompletableFuture = CompletableFuture
            .supplyAsync(
                    () ->
                //API call which returns object of type RoomTypes.RoomType
                updateService.updateRoom(51, 33, 759, entry.getKey(),
                        new RoomTypes.RoomType(entry.getKey(), map2.get(entry.getKey()),
                                entry.getValue())),
                    yourOwnExecutor
            );

    completableFutures.add(requestCompletableFuture);
}

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]))
        // avoid throwing an exception in the join() call
        .exceptionally(ex -> null)
        .join();
Map<Boolean, List<CompletableFuture<RoomTypes.RoomType>>> result =
        completableFutures.stream()
                .collect(Collectors.partitioningBy(CompletableFuture::isCompletedExceptionally)));

生成的映射将包含一个带有 true 的条目用于失败的期货,另一个条目带有 false 键用于成功的期货。然后您可以检查这 2 个条目以采取相应的行动。

请注意,与您的原始代码相比有 2 个细微变化:

  • requestCompletableFuture 现在是 CompletableFuture<RoomTypes.RoomType>
  • thenApply(responses::add)responses 列表已删除

关于日志记录/异常处理,只需添加相关的 requestCompletableFuture.handle() 以单独记录它们,但保留 requestCompletableFuture 而不是 handle() 产生的结果。

原文由 Didier L 发布,翻译遵循 CC BY-SA 4.0 许可协议

或者,也许您可以从不同的角度解决问题,而不是强制使用 CompletableFuture ,您可以改用 CompletionService

CompletionService 的整个想法是,一旦给定未来的答案准备就绪,它就会被放入队列中,您可以从中使用结果。

备选方案 1:没有 CompletableFuture

 CompletionService<String> cs = new ExecutorCompletionService<>(executor);

List<Future<String>> futures = new ArrayList<>();

futures.add(cs.submit(() -> "One"));
futures.add(cs.submit(() -> "Two"));
futures.add(cs.submit(() -> "Three"));
futures.add(cs.submit(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));

List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}

System.out.println(successes);
System.out.println(failures);

哪个产量:

 [One, Two, Three, Five]
[java.lang.RuntimeException: Sucks to be four]

备选方案 2:使用 CompletableFuture

但是,如果您真的、真的需要处理 CompletableFuture 您也可以将它们提交给完成服务,只需将它们直接放入其队列即可:

例如,以下变体具有相同的结果:

 BlockingQueue<Future<String>> tasks = new ArrayBlockingQueue<>(5);
CompletionService<String> cs = new ExecutorCompletionService<>(executor, tasks);

List<Future<String>> futures = new ArrayList<>();

futures.add(CompletableFuture.supplyAsync(() -> "One"));
futures.add(CompletableFuture.supplyAsync(() -> "Two"));
futures.add(CompletableFuture.supplyAsync(() -> "Three"));
futures.add(CompletableFuture.supplyAsync(() -> { throw new RuntimeException("Sucks to be four"); }));
futures.add(cs.submit(() -> "Five"));

//places all futures in completion service queue
tasks.addAll(futures);

List<String> successes = new ArrayList<>();
List<String> failures = new ArrayList<>();

while (futures.size() > 0) {
    Future<String> f = cs.poll();
    if (f != null) {
        futures.remove(f);
        try {
            //at this point the future is guaranteed to be solved
            //so there won't be any blocking here
            String value = f.get();
            successes.add(value);
        } catch (Exception e) {
            failures.add(e.getMessage());
        }
    }
}

原文由 Edwin Dalorzo 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题