我一直在研究新的 rx java 2,我不太确定我是否理解 backpressure
的想法……
我知道我们有 Observable
没有 backpressure
支持和 Flowable
有它。
因此,基于示例,假设我有 flowable
和 interval
:
Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
这将在大约 128 个值后崩溃,这很明显我消耗的速度比获取物品慢。
但是然后我们有相同的 Observable
Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
// do smth
}
});
这根本不会崩溃,即使我延迟消费它仍然有效。为了使 Flowable
工作可以说我放了 onBackpressureDrop
运算符,崩溃消失了,但也不是所有值都被发出。
所以我目前在脑海中找不到答案的基本问题是为什么我应该关心 backpressure
当我可以使用普通 Observable
仍然接收所有值而不管理 buffer
?或者从另一方面来说, backpressure
有什么优势可以让我有利于管理和处理消费?
原文由 user2141889 发布,翻译遵循 CC BY-SA 4.0 许可协议
背压在实践中体现的是有界缓冲区,
Flowable.observeOn
有一个包含 128 个元素的缓冲区,它的耗尽速度与 dowstream 可以承受的速度一样快。您可以单独增加此缓冲区大小以处理突发源,并且所有背压管理实践仍然适用于 1.x。Observable.observeOn
有一个无限缓冲区,不断收集元素,您的应用程序可能会耗尽内存。您可以使用
Observable
例如:您可以使用
Flowable
例如: