合并 Observables 列表并等待所有完成

新手上路,请多包涵

TL;DR 如何将 Task.whenAll(List<Task>) 转换为 RxJava

我现有的代码使用 Bolts 构建异步任务列表,并等待所有这些任务完成后再执行其他步骤。本质上,它构建了一个 List<Task> 并返回一个 Task 当列表中的 所有 任务完成时,它被标记为已完成,如 Bolts 站点上的示例所示

我正在寻找替换 BoltsRxJava 我假设这种方法建立异步任务列表(大小事先不知道)并将它们全部包装成一个 Observable 是可能的,但我不知道怎么做。

我已经尝试查看 mergezip concat List<Observable> 如果我正确理解文档,我会建立起来,因为他们似乎都适合一次只处理两个 Observables

我正在尝试学习 RxJava 并且对它仍然很陌生所以如果这是一个明显的问题或在文档中的某处解释,请原谅我;我试过搜索。任何帮助将非常感激。

原文由 Craig Russell 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 436
2 个回答

听起来您正在寻找 Zip operator

有几种不同的使用方法,让我们看一个例子。假设我们有一些不同类型的简单可观察对象:

 Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

等待它们的最简单方法是这样的:

 Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

请注意,在 zip 函数中,参数具有与被压缩的可观察对象类型相对应的具体类型。

也可以直接压缩 observables 列表:

 List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

…或者将列表包装成 Observable<Observable<?>>

 Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

然而,在这两种情况下,zip 函数只能接受一个 Object[] 参数,因为列表中的可观察对象的类型及其数量是事先不知道的。这意味着 zip 函数必须检查参数的数量并相应地转换它们。

无论如何,以上所有示例最终都会打印 1 Blah true

编辑: 使用 Zip 时,确保压缩的 Observables 发出相同数量的项目。在上面的示例中,所有三个可观察对象都发出了一个项目。如果我们把它们改成这样:

 Observable<Integer> obs1 = Observable.from(new Integer[]{1,2,3}); //Emits three items
Observable<String> obs2 = Observable.from(new String[]{"Blah","Hello"}); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]{true,true}); //Emits two items

然后 1, Blah, True2, Hello, True 将是唯一传递到 zip 函数的项目。项目 3 将永远不会被压缩,因为其他 observables 已经完成。

原文由 Malt 发布,翻译遵循 CC BY-SA 3.0 许可协议

您可以使用 flatMap 如果您有动态任务组合。是这样的:

 public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

并行执行的另一个很好的例子

注意:我真的不知道您对错误处理的要求。例如,如果只有一项任务失败了怎么办。我认为您应该验证这种情况。

原文由 MyDogTom 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
logo
Stack Overflow 翻译
子站问答
访问
宣传栏