据我了解 RxJava2 values.take(1)
创建另一个 Observable,它只包含原始 Observable 的一个元素。它 不能 抛出异常,因为它被 take(1)
的效果过滤掉了,因为它是第二次发生的。
如 以下 代码片段
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
输出
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main$0(ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
我的问题:
- 我的理解正确吗?
- 导致异常的真正原因是什么。
- 消费者如何解决这个问题?
原文由 abd3lraouf 发布,翻译遵循 CC BY-SA 4.0 许可协议
create(...)
已停止。为了在这种情况下完全安全,您需要使用o.isDisposed()
查看 observable 是否已在下游结束。onError
调用丢失。如果 observable 已经终止,它要么被传递到下游,要么作为全局UndeliverableException
抛出。由 Observable 的创建者“正确地”处理 observable 结束并发生异常的情况。Observable
) 和消费者 (Subscriber
) 对流何时结束存在分歧。由于在这种情况下生产者的寿命比消费者长,因此只能在生产者中解决问题。