RxJava怎么定时依次发送集合的每个元素?

pdog
  • 573

我有一个集合,其中包含了若干Message对象。
我想通过RxJava从集合中每隔一秒依次发送集合中的对象

我一开始的时候想这样操作,发现一次也不发送对象

Flowable<Message> messageFlowable = Flowable.fromIterable(mock.messages);
Flowable<Long> timeFlowable = Flowable.interval(1, TimeUnit.SECONDS);
Flowable<String> flowable = Flowable.zip(messageFlowable, timeFlowable, new BiFunction<Message, Long, String>() {
            @Override
            public String apply(Message message, Long aLong) throws Exception {
                return message.content;
            }
        });
flowable.subscribe(/*....*/)

后来我就想用这种方式

 Flowable.fromIterable(mock.messages)
            .map(message -> message.content)
            .delay(1, TimeUnit.SECONDS)
            .subscribe(s -> Timber.d("s = %s", s));
                

但是发现集合一瞬间就被发送完了。并没有被延时。

有什么办法可以延时发送集合吗?

    delay(1s)      delay(1s)      delay(1s)
 0 -----------> 1 -----------> 2 -----------> 3 ...
回复
阅读 4.3k
3 个回答
✓ 已被采纳

RxJava我没用过,但我用过RxJS, 大体思想应该一下
你应该这样先起一个定时器的流,然后每个流都去接上新的流。
因为interval的时间不保准,所以用flatMap使用流上所有的数据。
具体的API可能不一样,但是大致是这个思想吧。

Flowable.interval(1, TimeUnit.SECONDS).flatMap(() => return messageFlowable);
CaiJingLong
  • 195

代码用kotlin写的,写android的人不管会不会写,大概应该能看懂
纯kotlin工程,非android工程,线程随便用的,android里根据你的线程不同自己考虑用不用主线程

package com.github.caijinglong.rxjava

import io.reactivex.Flowable
import io.reactivex.FlowableSubscriber
import io.reactivex.schedulers.Schedulers
import org.reactivestreams.Subscription

fun main(args: Array<String>) {

    var mSubscription: Subscription? = null 

    val list = arrayListOf(1, 2, 3, 4, 5)
    Flowable.fromIterable(list)
            .observeOn(Schedulers.io())
            .subscribeOn(Schedulers.computation())
            .subscribe(object : FlowableSubscriber<Int> {
                override fun onComplete() {
                }

                override fun onSubscribe(s: Subscription) {
                    mSubscription = s
                    s.request(1)
                }

                override fun onNext(p0: Int?) {
                    println("${Date().toLocaleString()} : $p0" )
                    Thread.sleep(1000)
                    mSubscription?.request(1)
                }

                override fun onError(p0: Throwable?) {
                }
            })

    Thread.sleep(5000)//为了防止退出main函数结束,实际的android工程中不会退出,这里可以不用写
}

日志

2018-1-15 15:53:38 : 1
2018-1-15 15:53:39 : 2
2018-1-15 15:53:40 : 3
2018-1-15 15:53:41 : 4
2018-1-15 15:53:42 : 5

Process finished with exit code 0

使用doOnNext() + sleep(2000)

  Flowable.fromIterable(this.mock.messages)
                .doOnNext(message -> SystemClock.sleep(2000))
                .subscribe(message -> Timber.d("message.content = %s", message.content));

使用Zip操作符

Observable<Message> listObservable = Observable.fromIterable(mock.messages);
Observable<Long> timeObservable = Observable.interval(300, TimeUnit.MILLISECONDS);
Observable<String> zip =
        Observable.zip(listObservable, timeObservable, (message, aLong) -> message.content);

zip.doOnComplete(() -> Timber.d("complete"))
        .subscribe(s -> Timber.d("s = %s", s));
宣传栏