RxJS 怎么合并流返回的数据?

例如

'use strict';

const Rx = require('rx');

const x = Rx.Observable
            .just('x')
            .flatMap((data) => {
                const subject = new Rx.Subject();
                setTimeout(() => {
                    subject.onNext([data]);
                }, 1000)
                return subject;
            })

const y = Rx.Observable
            .just('y')
            .flatMap((data) => {
                const subject = new Rx.Subject();
                setTimeout(() => {
                    subject.onNext([data]);
                }, 3000)
                return subject;
            })

Rx.Observable.zip(x, y)
            // .map(([x, y]) => {
            //     return x.concat(y);
            // })
            .subscribe((data) => {
                console.log(data);
            }, (err) => {
                console.log(err);
            });

除了用 map 重构返回的数据结构之外,还有什么更加好的办法合并呢?

阅读 7k
2 个回答

楼上的答案之所以不行,是因为 forkJoin串行 地结合每个 Observable 的最后一个 emit 的值。

既然要串行,意思就是 a 结束了才到 bb 结束了才到 c

而题目中定义的 xy 都没有明确地定义其 complete ,所以 forkJoin 不知道 x 什么时候结束,从而无法进行下去。

所以,想要楼上的答案可行的话,明确地给每个 subject 加上其 complete 方法:

subject.onNext([data])
subject.complete()

然而:

  • xy 为什么要用 subject 而不用 Observable ?

  • 为什么要把 xy 的值放在一个数组里面,目的是什么?

Rx.Observable.forkJoin(x, y)
    .subscribe(data => {
        console.log(data);
    }, err => {
        console.log(err);
    });
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进