RxJava的Observable订阅时如何作到在onComplete被调用时取消订阅?

例:

table.subscribe(tableIns -> {
            // System.out.println("-------Information-------");
            System.out.println(tableIns);
        }, throwable -> {
            throw new SchemaExportException(throwable);
        }, new Action() {
            @Override
            public void run() throws Exception {
                System.out.println("Complete");
                // 在这里取消订阅
        });

注意:是非Android 运行环境, 使用的是RxJava2.x

阅读 555
avatarAI BotBETA

你可以在 Actionrun 方法中取消订阅,就像这样:

Observable.create(emitter -> {
    // 你的代码...
}).subscribe(tableIns -> {
    System.out.println(tableIns);
}, throwable -> {
    throw new SchemaExportException(throwable);
}, () -> {
    System.out.println("Complete");
    emitter.unsubscribe(); // 在这里取消订阅
});

这段代码中,我们使用 Observable.create 来创建一个 Observable。这个 Observable 在执行过程中会调用 emitter.onNextemitter.onErroremitter.onComplete。在 onComplete 被调用时,我们调用 emitter.unsubscribe() 来取消订阅。

注意,你需要确保 emitteronComplete 被调用时仍然可用。如果 emitteronComplete 调用之前就已经不再可用,那么你可能会遇到问题。

1 个回答

暂时只能在onComplete中设置: CompletableFuture.complete来通知调用方结束了. 示例:

Flowable由持久层方法返回,是调用方中的:result.getAll(dbName.get(), strategy)result.getTableColumn(table)

public class ConsoleSchemaFlowableOutput implements SchemaFlowableOutput {
    private final static Logger logger = LoggerFactory.getLogger(ConsoleSchemaFlowableOutput.class);
    private volatile CompletableFuture<String> future = new CompletableFuture<>();
    private AtomicInteger count = new AtomicInteger(0);
    @Override
    public Disposable flush(Information information, Flowable<Table> table) throws SchemaExportException {
        logger.info("Start Flowable Flush");
        Disposable export_flush_complete = table.subscribe(tableIns -> {
            System.out.println(printAsciiTable(tableIns));
            System.out.println(printAsciiColumns(tableIns.getColumns()));
            System.out.println("\r\n");
            count.addAndGet(1);
        }, throwable -> {
            logger.debug("Export Break, reason: " + throwable.getMessage());
            future.cancel(true);
            throw new SchemaExportException(throwable);
        }, new Action() {
            @Override
            public void run() throws Exception {
                logger.debug("Export Complete, Affect Size:"+count.get());
                future.complete("OK");
            }
        });
        return export_flush_complete;
    }
    @Override
    public CompletableFuture<String> getFuture() {
        return future;
    }
    ...
}

调用方:

    public void export(Information info, SchemaFlowableOutput out) throws SchemaExportException{
        long startStamp = System.currentTimeMillis();
        // Flowable
        Flowable<Table> tableFlowable = result.getAll(dbName.get(), strategy).flatMap(new Function<Table, Publisher<Table>>() {
            @Override
            public Publisher<Table> apply(@NonNull Table table) throws Exception {
                return result.getTableColumn(table).flatMap(new Function<List<Column>, SingleSource<Table>>() {
                    @Override
                    public SingleSource<Table> apply(@NonNull List<Column> columns) throws Exception {
                        return Single.just(table.fillColumn(columns));
                    }
                }).flatMapPublisher(new Function<Table, Publisher<? extends Table>>() {
                    @Override
                    public Publisher<? extends Table> apply(@NonNull Table table) throws Exception {
                        return Flowable.just(table);
                    }
                });
            }
        });
        Disposable disposable = null;
        try {
            disposable = out.flush(info, tableFlowable);
            CompletableFuture<String> future= out.getFuture();
            while(!future.isDone()){
                logger.info("[ERE-Flowable]未完成,线程休眠1秒");
                Thread.currentThread().sleep(1000,0);
            }
            String result = future.get();
            logger.info("[ERE-Flowable]完成, 结果:"+result);
            if(result.equals("OK")){
                long finishStamp = System.currentTimeMillis();
                clearHander(disposable, "[ERE-Flowable]RxJava disposed because complete, WithTime: "+(finishStamp-startStamp));
            }
        }catch (Exception e){
            clearHander(disposable, "[ERE-Flowable]RxJava disposed has Exception: "+e.getMessage());
        }
    }

    private void clearHander(Disposable disposable, String reason){
        logger.info(reason);
        if(null!=disposable && !disposable.isDisposed()) {
            disposable.dispose();
        }else{
            if(null != disposable) {
                logger.info("[CH]disposable status:" + disposable.isDisposed());
            }else{
                logger.info("[CH]disposable is null:");
            }
        }
        // 结束后的回调,执行一些清理工作
        completeHandler.apply();
    }
推荐问题
宣传栏