RxJava2 observable 抛出 UndeliverableException

新手上路,请多包涵

据我了解 RxJava2 values.take(1) 创建另一个 Observable,它只包含原始 Observable 的一个元素。它 不能 抛出异常,因为它被 take(1) 的效果过滤掉了,因为它是第二次发生的。

以下 代码片段

    Observable<Integer> values = Observable.create(o -> {
        o.onNext(1);
        o.onError(new Exception("Oops"));
    });

    values.take(1)
            .subscribe(
                    System.out::println,
                    e -> System.out.println("Error: " + e.getMessage()),
                    () -> System.out.println("Completed")
            );

输出

1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
    at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
    at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
    at ch02.lambda$main$0(ch02.java:28)
    at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
    at io.reactivex.Observable.subscribe(Observable.java:10841)
    at io.reactivex.Observable.subscribe(Observable.java:10827)
    at io.reactivex.Observable.subscribe(Observable.java:10787)
    at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
    ... 8 more

我的问题:

  1. 我的理解正确吗?
  2. 导致异常的真正原因是什么。
  3. 消费者如何解决这个问题?

原文由 abd3lraouf 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 906
2 个回答
  1. 是的,但是因为可观察到的“结束”并不意味着内部运行的代码 create(...) 已停止。为了在这种情况下完全安全,您需要使用 o.isDisposed() 查看 observable 是否已在下游结束。
  2. 例外是因为 RxJava 2 的政策是永远不允许 onError 调用丢失。如果 observable 已经终止,它要么被传递到下游,要么作为全局 UndeliverableException 抛出。由 Observable 的创建者“正确地”处理 observable 结束并发生异常的情况。
  3. 问题是生产者 ( Observable ) 和消费者 ( Subscriber ) 对流何时结束存在分歧。由于在这种情况下生产者的寿命比消费者长,因此只能在生产者中解决问题。

原文由 Kiskae 发布,翻译遵循 CC BY-SA 3.0 许可协议

@Kiskae 在之前的评论中正确回答了发生此类异常的原因。

这里是关于这个主题的官方文档的链接: [RxJava2-wiki](https://github.com/ReactiveX/RxJava/wiki/What’s-different-in-2.0#error-handling) 。

有时您无法更改此行为,因此有一种方法可以处理这个 UndeliverableException 的。以下是如何避免崩溃和不当行为的代码片段:

 RxJavaPlugins.setErrorHandler(e -> {
    if (e instanceof UndeliverableException) {
        e = e.getCause();
    }
    if ((e instanceof IOException) || (e instanceof SocketException)) {
        // fine, irrelevant network problem or API that throws on cancellation
        return;
    }
    if (e instanceof InterruptedException) {
        // fine, some blocking code was interrupted by a dispose call
        return;
    }
    if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
        // that's likely a bug in the application
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    if (e instanceof IllegalStateException) {
        // that's a bug in RxJava or in a custom operator
        Thread.currentThread().getUncaughtExceptionHandler()
            .handleException(Thread.currentThread(), e);
        return;
    }
    Log.warning("Undeliverable exception received, not sure what to do", e);
});

此代码取自上面的链接。

重要的提示。这种方法将全局错误处理程序设置为 RxJava,因此如果你能摆脱这些异常——这将是更好的选择。

原文由 Ilia Kurtov 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题