Concat VS 合并运算符

新手上路,请多包涵

我正在查看 RXJava 的文档,我注意到 concat 和 merge 运算符似乎做同样的事情。我写了几个测试来确定。

 @Test
public void testContact() {

    Observable.concat(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
              .subscribe(System.out::println);
}

@Test
public void testMerge() {

    Observable.merge(Observable.just("Hello"),
                      Observable.just("reactive"),
                      Observable.just("world"))
            .subscribe(System.out::println);
}

文件说

Merge 运算符也类似。它结合了两个或多个 Observable 的发射,但可以交错它们,而 Concat 从不交错多个 Observable 的发射。

但我还是不完全明白,运行此测试数千次合并结果总是相同的。由于未授予订单,因此我有时会期待“反应性”“世界”“你好”。

代码在这里 https://github.com/politrons/reactive

原文由 paul 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 491
2 个回答

正如您引用的文档中所述 - merge 可以交错输出,而 concat 将首先等待较早的流完成,然后再处理后面的流。在您的情况下,对于单元素静态流,它没有任何真正的区别(但理论上,合并可以随机顺序输出单词,并且根据规范仍然有效)。如果您想看到差异,请尝试以下操作(之后您需要添加一些睡眠以避免提前退出)

     Observable.merge(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 B0 A1 B1 B2 A2 B3 A3 B4 A4

相对

    Observable.concat(
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "A" + id),
            Observable.interval(1, TimeUnit.SECONDS).map(id -> "B" + id))
    .subscribe(System.out::println);

A0 A1 A2 A3 A4 A5 A6 A7 A8

Concat 永远不会开始打印 B,因为流 A 永远不会结束。

s/流/可观察/g ;)

文档提供了漂亮的图表来显示差异。您需要记住,合并并不能 保证 一个一个地交错项目,它只是一个可能的例子。

连续

连接运算符合并

合并运算符

原文由 Artur Biesiadowski 发布,翻译遵循 CC BY-SA 3.0 许可协议

连续

Concat 发出来自两个或多个 Observable 的发射,而不交错它们。它将在发出项目时保持可观察对象的顺序。这意味着它将发出第一个可观察对象的所有项目,然后它将发出第二个可观察对象的所有项目,依此类推。

连接运算符

让我们通过一个例子来清楚地理解它。

 final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.concat(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

当我们使用 Concat 运算符时,它将保持顺序并将值发出为 A1、A2、A3、A4、B1、B2、B3。

合并

Merge 通过合并它们的发射将多个 Observables 合并为一个。它不会在发出物品时维持秩序。

合并运算符

让我们通过一个例子来清楚地理解它。

 final String[] listFirst = {"A1", "A2", "A3", "A4"};
final String[] listSecond = {"B1", "B2", "B3"};

final Observable<String> observableFirst = Observable.fromArray(listFirst);
final Observable<String> observableSecond = Observable.fromArray(listSecond);

Observable.merge(observableFirst, observableSecond)
        .subscribe(new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

由于我们使用的是 Merge Operator,它不会维护顺序并且可以按任何顺序发出值,例如 A1、B1、A2、A3、B2、B3、A4A1、A2、B1、B2、A3、A4、B3 或者可以是任何东西。

这就是我们应该如何根据我们的用例在 RxJava 中使用 Concat 和 Merge 运算符。

原文由 Amit Shekhar 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题