CompletableFuture、Future和RxJava的Observable的区别

新手上路,请多包涵

I would like to know the difference between CompletableFuture , Future and Observable RxJava .

我所知道的都是异步的但是

Future.get() 阻塞线程

CompletableFuture 给出回调方法

RxJava Observable —类似于 CompletableFuture 还有其他好处(不确定)

例如:如果客户端需要进行多个服务调用,当我们使用 Futures (Java) Future.get() 将按顺序执行…想知道它在 RxJava 中如何更好..

文档 http://reactivex.io/intro.html

很难使用 Futures 来优化组合有条件的异步执行流程(或者不可能,因为每个请求的延迟在运行时会有所不同)。当然,这可以做到,但它很快就会变得复杂(因此容易出错),或者它过早地阻塞在 Future.get() 上,从而消除了异步执行的好处。

真的很想知道 RxJava 如何解决这个问题。我发现很难从文档中理解。

原文由 shiv455 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 922
2 个回答

期货

Futures 是在 Java 5 (2004) 中引入的。它们基本上是尚未完成的操作结果的占位符。操作完成后, Future 将包含该结果。例如,操作可以是提交给 ExecutorServiceRunnableCallable 实例。操作的提交者可以使用 Future 对象来检查操作是否为 Done () ,或者使用阻塞的 get() 方法等待它完成。

例子:

 /**
* A task that sleeps for a second, then returns 1
**/
public static class MyCallable implements Callable<Integer> {

    @Override
    public Integer call() throws Exception {
        Thread.sleep(1000);
        return 1;
    }

}

public static void main(String[] args) throws Exception{
    ExecutorService exec = Executors.newSingleThreadExecutor();
    Future<Integer> f = exec.submit(new MyCallable());

    System.out.println(f.isDone()); //False

    System.out.println(f.get()); //Waits until the task is done, then prints 1
}

可完成期货

CompletableFutures 是在 Java 8 (2014) 中引入的。它们实际上是常规 Futures 的演变,受 Google 的 Listenable Futures 启发,是 Guava 库的一部分。它们是 Futures,还允许您将任务串成一条链。您可以使用它们告诉某些工作线程“去做一些任务 X,完成后,使用 X 的结果去做另一件事”。使用 CompletableFutures,您可以对操作的结果做一些事情,而无需实际阻塞线程来等待结果。这是一个简单的例子:

 /**
* A supplier that sleeps for a second, and then returns one
**/
public static class MySupplier implements Supplier<Integer> {

    @Override
    public Integer get() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            //Do nothing
        }
        return 1;
    }
}

/**
* A (pure) function that adds one to a given Integer
**/
public static class PlusOne implements Function<Integer, Integer> {

    @Override
    public Integer apply(Integer x) {
        return x + 1;
    }
}

public static void main(String[] args) throws Exception {
    ExecutorService exec = Executors.newSingleThreadExecutor();
    CompletableFuture<Integer> f = CompletableFuture.supplyAsync(new MySupplier(), exec);
    System.out.println(f.isDone()); // False
    CompletableFuture<Integer> f2 = f.thenApply(new PlusOne());
    System.out.println(f2.get()); // Waits until the "calculation" is done, then prints 2
}

RxJava

RxJava 是 Netflix 创建的用于 响应式编程 的完整库。乍一看,它会类似于 Java 8 的流。是的,只是它更强大。

与 Futures 类似,RxJava 可用于将一堆同步或异步操作串在一起以创建处理管道。与一次性使用的 Future 不同,RxJava 处理零个或多个项目的 _流_。包括具有无限数量项目的永无止境的流。由于一组令人难以置信的丰富 的运算符,它也更加灵活和强大。

与 Java 8 的流不同,RxJava 还具有 背压 机制,这允许它处理处理管道的不同部分 以不同速率 在不同线程中运行的情况。

RxJava 的缺点是,尽管有可靠的文档,但由于涉及范式转换,它是一个难以学习的库。 Rx 代码也可能是调试的噩梦,尤其是在涉及多个线程的情况下,甚至更糟——如果需要背压。

如果你想深入了解,官方网站上有一整 的各种教程,还有官方 文档Javadoc 。您还可以看一些视频,例如 这个 视频,它简要介绍了 Rx,还讨论了 Rx 和 Futures 之间的区别。

奖励:Java 9 反应流

Java 9 的 Reactive Streams aka Flow API 是一组由各种 反应流 库(例如 RxJava 2Akka StreamsVertx )实现的接口。它们允许这些反应式库相互连接,同时保留所有重要的背压。

原文由 Malt 发布,翻译遵循 CC BY-SA 4.0 许可协议

我从 0.9 开始就一直在使用 Rx Java,现在是 1.3.2 并且很快迁移到 2.x 我在一个我已经工作了 8 年的私人项目中使用它。

如果没有这个库,我就不会再编程了。一开始我很怀疑,但这是你需要创造的一种完全不同的心态。一开始相当困难。我有时会盯着弹珠看几个小时……哈哈

这只是一个实践问题,真正了解流程(又名可观察对象和观察者的契约),一旦你到达那里,你就会讨厌这样做。

对我来说,那个图书馆并没有真正的缺点。

用例:我有一个包含 9 个仪表(cpu、内存、网络等)的监视器视图。启动视图时,视图将自己订阅到一个系统监视器类,该类返回一个包含 9 米的所有数据的可观察值(间隔)。它会每秒将一个新结果推送到视图(所以不是轮询!!!)。该可观察对象使用平面图同时(异步!)从 9 个不同的来源获取数据并将结果压缩到一个新模型中,您的视图将在 onNext() 上获取。

你到底要如何处理期货、可完成品等……祝你好运! :)

Rx Java 为我解决了编程中的许多问题,并在某种程度上简化了……

优点:

  • 无状态!!! (值得一提的是,也许最重要)
  • 开箱即用的线程管理
  • 构建具有自己生命周期的序列
  • 一切都是可观察的,所以链接很容易
  • 更少的代码编写
  • 类路径上的单个 jar(非常轻量级)
  • 高并发
  • 没有回调地狱了
  • 基于订阅者(消费者和生产者之间的紧密契约)
  • 背压策略(断路器之类)
  • 出色的错误处理和恢复
  • 非常好的文档(marbles )
  • 完全控制
  • 还有很多 …

缺点: - 难以测试

原文由 Kristoffer 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题