Observable 与 Flowable rxJava2

新手上路,请多包涵

我一直在研究新的 rx java 2,我不太确定我是否理解 backpressure 的想法……

我知道我们有 Observable 没有 backpressure 支持和 Flowable 有它。

因此,基于示例,假设我有 flowableinterval

         Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

这将在大约 128 个值后崩溃,这很明显我消耗的速度比获取物品慢。

但是然后我们有相同的 Observable

      Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

这根本不会崩溃,即使我延迟消费它仍然有效。为了使 Flowable 工作可以说我放了 onBackpressureDrop 运算符,崩溃消失了,但也不是所有值都被发出。

所以我目前在脑海中找不到答案的基本问题是为什么我应该关心 backpressure 当我可以使用普通 Observable 仍然接收所有值而不管理 buffer ?或者从另一方面来说, backpressure 有什么优势可以让我有利于管理和处理消费?

原文由 user2141889 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 722
2 个回答

背压在实践中体现的是有界缓冲区, Flowable.observeOn 有一个包含 128 个元素的缓冲区,它的耗尽速度与 dowstream 可以承受的速度一样快。您可以单独增加此缓冲区大小以处理突发源,并且所有背压管理实践仍然适用于 1.x。 Observable.observeOn 有一个无限缓冲区,不断收集元素,您的应用程序可能会耗尽内存。

您可以使用 Observable 例如:

  • 处理 GUI 事件
  • 使用短序列(总共少于 1000 个元素)

您可以使用 Flowable 例如:

  • 冷源和非定时源
  • 类似源的生成器
  • 网络和数据库访问器

原文由 akarnokd 发布,翻译遵循 CC BY-SA 3.0 许可协议

背压是指您的可观察对象(发布者)创建的事件多于订阅者可以处理的事件。所以你可以让订阅者丢失事件,或者你可以得到一个巨大的事件队列,最终导致内存不足。 Flowable 考虑了背压。 Observable 没有。而已。

它让我想起了一个漏斗,当它有太多的液体溢出。 Flowable 可以帮助避免这种情况发生:

背压巨大:

在此处输入图像描述

但是使用可流动的,背压要小得多:

在此处输入图像描述

Rxjava2 有一些背压策略,你可以根据你的用例使用。通过策略,我的意思是 Rxjava2 提供了一种方法来处理由于溢出(背压)而无法处理的对象。

这是策略。 我不会一一列举,但是例如,如果你不想担心溢出的项目,你可以使用这样的放置策略:

observable.toFlowable(BackpressureStrategy.DROP)

据我所知,队列中应该有 128 个项目限制,之后可能会溢出(背压)。即使它不是 128,它也接近那个数字。希望这对某人有帮助。

如果您需要将缓冲区大小从 128 更改为 128,它看起来可以像这样完成(但注意任何内存限制:

 myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.

在软件开发中,通常背压策略意味着你告诉发射器放慢一点,因为消费者无法处理你的发射事件的速度。

原文由 j2emanue 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题