将 Java Future 转换为 CompletableFuture

新手上路,请多包涵

Java 8 引入了 CompletableFuture ,一个可组合的 Future 的新实现(包括一堆 thenXxx 方法)。我想专门使用它,但是我想使用的许多库只返回不可组合的 Future 实例。

有没有办法将返回的 Future 实例包装在 CompleteableFuture 中,以便我可以编写它?

原文由 Dan Midwood 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 1.8k
2 个回答

有一种方法,但你不会喜欢它。以下方法将 Future<T> 转换为 CompletableFuture<T>

 public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

显然,这种方法的问题在于,对于每个 Future ,一个线程将被阻塞以等待 Future 的结果——这与 futures 的想法相矛盾。在某些情况下,可能会做得更好。但是,一般来说,不主动等待 Future 的结果是没有解决办法的。

原文由 nosid 发布,翻译遵循 CC BY-SA 4.0 许可协议

如果您要使用的库除了 Future 样式之外还提供回调样式方法,您可以为其提供一个处理程序来完成 CompletableFuture 而无需任何额外的线程阻塞。像这样:

     AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ...
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

在没有回调的情况下,我认为解决此问题的唯一其他方法是使用轮询循环,将所有 Future.isDone() 检查在单个线程上,然后在 Future 可获取时调用完成。

原文由 Kafkaesque 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题