rxjava:我可以使用 retry() 但有延迟吗?

新手上路,请多包涵

我在我的 Android 应用程序中使用 rxjava 来异步处理网络请求。现在我只想在经过一定时间后重试失败的网络请求。

有什么方法可以在 Observable 上使用 retry() 但仅在一定延迟后重试?

有没有办法让 Observable 知道当前正在重试(而不是第一次尝试)?

我看了一下 debounce()/throttleWithTimeout() 但他们似乎在做一些不同的事情。

编辑:

我想我找到了一种方法,但我对确认这是正确的方法或其他更好的方法感兴趣。

我正在做的是:在我的 Observable.OnSubscribe 的 call() 方法中,在我调用 Subscribers onError() 方法之前,我只是让线程休眠所需的时间。因此,要每 1000 毫秒重试一次,我会执行以下操作:

 @Override
public void call(Subscriber<? super List<ProductNode>> subscriber) {
    try {
        Log.d(TAG, "trying to load all products with pid: " + pid);
        subscriber.onNext(productClient.getProductNodesForParentId(pid));
        subscriber.onCompleted();
    } catch (Exception e) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e.printStackTrace();
        }
        subscriber.onError(e);
    }
}

由于此方法无论如何都在 IO 线程上运行,因此它不会阻塞 UI。我能看到的唯一问题是,即使是第一个错误也会延迟报告,所以即使没有 retry(),延迟也会存在。如果延迟不是在错误 之后 应用而是 重试之前应用(但显然不是在第一次尝试之前),我会更喜欢它。

原文由 david.mihola 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 578
2 个回答

您可以使用 retryWhen() 运算符将重试逻辑添加到任何 Observable。

以下类包含重试逻辑:

RxJava 2.x

 public class RetryWithDelay implements Function<Observable<? extends Throwable>, Observable<?>> {
    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> apply(final Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Function<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> apply(final Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

RxJava 1.x

 public class RetryWithDelay implements
        Func1<Observable<? extends Throwable>, Observable<?>> {

    private final int maxRetries;
    private final int retryDelayMillis;
    private int retryCount;

    public RetryWithDelay(final int maxRetries, final int retryDelayMillis) {
        this.maxRetries = maxRetries;
        this.retryDelayMillis = retryDelayMillis;
        this.retryCount = 0;
    }

    @Override
    public Observable<?> call(Observable<? extends Throwable> attempts) {
        return attempts
                .flatMap(new Func1<Throwable, Observable<?>>() {
                    @Override
                    public Observable<?> call(Throwable throwable) {
                        if (++retryCount < maxRetries) {
                            // When this Observable calls onNext, the original
                            // Observable will be retried (i.e. re-subscribed).
                            return Observable.timer(retryDelayMillis,
                                    TimeUnit.MILLISECONDS);
                        }

                        // Max retries hit. Just pass the error along.
                        return Observable.error(throwable);
                    }
                });
    }
}

用法:

 // Add retry logic to existing observable.
// Retry max of 3 times with a delay of 2 seconds.
observable
    .retryWhen(new RetryWithDelay(3, 2000));

原文由 kjones 发布,翻译遵循 CC BY-SA 3.0 许可协议

Paul 的回答 启发,如果您不关心 Abhijit Sarkar 提出的 retryWhen 问题,使用 rxJava2 无条件延迟重新订阅的最简单方法是:

 source.retryWhen(throwables -> throwables.delay(1, TimeUnit.SECONDS))

您可能希望查看有关 retryWhen 和 repeatWhen 的更多示例和说明。

原文由 McX 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题