rxjava如何让Obserable在新线程执行?

问题描述 相关代码

我希望我创建的observable在非创建线程执行. 并每subscribe一次, 排队执行一次. 代码如下

    public void test(){
        Log.d(TAG, "test() called");
        Observable<SerialSignal> dd = Observable.fromCallable(new Callable<SerialSignal>() {
            @Override
            public SerialSignal call() throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "call() called" + System.currentTimeMillis()/1000L);
                return null;
            }
        });
        Observable<SerialSignal> d = Observable.create(new ObservableOnSubscribe<SerialSignal>() {
            @Override
            public void subscribe(ObservableEmitter<SerialSignal> emitter) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "subscribe() called" + System.currentTimeMillis()/1000L);
                emitter.onNext(null);
            }
        });
        d.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());
        d.subscribe(new Observer<SerialSignal>() {
            @Override
            public void onNext(SerialSignal serialSignal) {
                Log.i(TAG, "onNext: subscriber 1" +System.currentTimeMillis() /1000L );
            }
            //省了 onError onComplete 等
        });
        d.subscribe(new Observer<SerialSignal>() {
            @Override
            public void onNext(SerialSignal serialSignal) {
                Log.i(TAG, "onNext: 112 " + System.currentTimeMillis() /1000L);
            }
        });

问题出现的环境背景及自己尝试过哪些方法

你期待的结果是什么?实际看到的错误信息又是什么?

结果test() called 和 call() 都在主线程执行;而且observer的onnext 不执行.
请问哪里写错了吗. 我希望 called 在新线程执行, onNext在Android 主线程执行

阅读 3.3k
1 个回答

emitter.onNext(null);改为emitter.onNext(new SerialSignal());
参考源码ObservableCreate$CreateEmitter,onNext若为null,走onError

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

至于call不在子线程是因为你没赋值

    d = d.subscribeOn(Schedulers.io())
         .observeOn(AndroidSchedulers.mainThread());
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题
宣传栏