Java Reactor Flux 并发生产问题

如下例子

 static Mono<Dog> getDog(Integer id) {
        System.out.println(Thread.currentThread().getName() + " getDog() 等待 2 秒");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return Mono.fromCallable(() -> new Dog(id)).subscribeOn(Schedulers.boundedElastic());
    }

    static class Dog {
        Integer id;

        public Dog(Integer id) {
            this.id = id;
        }

        public Integer getId() {
            return id;
        }

        @Override
        public String toString() {
            return "Dog{" +
                    "id=" + id +
                    '}';
        }
    }
public static void main(String[] args) throws InterruptedException {

        Integer[] ids = {1, 2, 3, 4};
        Flux.merge(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3]))
                .subscribeOn(Schedulers.boundedElastic())
                .toStream().forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
    }

结果

main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: Dog{id=1}
main :: Dog{id=2}
main :: Dog{id=3}
main :: Dog{id=4}

在网上查询一下,有文章说可以通过 create 方式来解决,

List<Mono<Dog>> list = Arrays.asList(getDog(1), getDog(2), getDog(3));
        Flux.create(sink -> {
                    for (Mono<Dog> dog : list) {
                        sink.next(dog);
                    }
                }).subscribeOn(Schedulers.boundedElastic(), true).toStream()
                .forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
// output:
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: MonoSubscribeOnCallable
main :: MonoSubscribeOnCallable
main :: MonoSubscribeOnCallable

这样并没有使用多线程生产。
也尝试用 zip方法,也没有达到想要的效果。

Integer[] ids = {1, 2, 3, 4};
        Flux.zip(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3])).subscribeOn(Schedulers.boundedElastic(), true).toStream()
                .forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));

//output:
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: [Dog{id=1},Dog{id=2},Dog{id=3},Dog{id=4}]

想让 merge方法里面的 getDog 方法通过多线程执行,最后 merge 一个集合,不知道有什么方式来解决, 请教一下,谢谢。

阅读 3.5k
3 个回答

image.png

public Mono<Long> packageMono() {
    return Mono.just(System.currentTimeMillis())
            // 延迟一秒模拟请求
            .delayElement(Duration.of(1L, ChronoUnit.SECONDS));
}

@Test
public void testRandom() throws Exception {

    final Tuple3<Long, Long, Long> test = Mono
            .zip(packageMono(), packageMono(), packageMono())
            .block();

    System.out.println(test);
}

参考 https://stackoverflow.com/que...

经过这两天网上查询的资料,大致写出来,在这里记录一下。

 static Random random = new Random();

    static Mono<Dog> getDog(Integer id) {
        return Mono.fromCallable(() -> {
            System.out.println( Thread.currentThread().getName() + " getDog() 等待 2 秒");
            try {
                long time = random.nextInt(2000);
                Thread.sleep(time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new Dog(id);
        }).subscribeOn(Schedulers.boundedElastic()).timeout(Duration.of(2, ChronoUnit.SECONDS)).onErrorReturn(new Dog(100));
    }
  public static void main(String[] args) throws InterruptedException {
         Integer[] ids = {1, 2, 3, 4};

        Flux.merge(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3]))
                .toStream().forEach(dog -> System.out.println(Thread.currentThread().getName() + " : " + dog));

 }

运行结果如下:

boundedElastic-1 getDog() 等待 2 秒
boundedElastic-4 getDog() 等待 2 秒
boundedElastic-3 getDog() 等待 2 秒
boundedElastic-2 getDog() 等待 2 秒
main : Dog{id=3}
main : Dog{id=4}
main : Dog{id=1}
main : Dog{id=2}
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题