如下例子
static Mono<Dog> getDog(Integer id) {
System.out.println(Thread.currentThread().getName() + " getDog() 等待 2 秒");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.fromCallable(() -> new Dog(id)).subscribeOn(Schedulers.boundedElastic());
}
static class Dog {
Integer id;
public Dog(Integer id) {
this.id = id;
}
public Integer getId() {
return id;
}
@Override
public String toString() {
return "Dog{" +
"id=" + id +
'}';
}
}
public static void main(String[] args) throws InterruptedException {
Integer[] ids = {1, 2, 3, 4};
Flux.merge(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3]))
.subscribeOn(Schedulers.boundedElastic())
.toStream().forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
}
结果
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: Dog{id=1}
main :: Dog{id=2}
main :: Dog{id=3}
main :: Dog{id=4}
在网上查询一下,有文章说可以通过 create
方式来解决,
List<Mono<Dog>> list = Arrays.asList(getDog(1), getDog(2), getDog(3));
Flux.create(sink -> {
for (Mono<Dog> dog : list) {
sink.next(dog);
}
}).subscribeOn(Schedulers.boundedElastic(), true).toStream()
.forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
// output:
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: MonoSubscribeOnCallable
main :: MonoSubscribeOnCallable
main :: MonoSubscribeOnCallable
这样并没有使用多线程生产。
也尝试用 zip
方法,也没有达到想要的效果。
Integer[] ids = {1, 2, 3, 4};
Flux.zip(getDog(ids[0]), getDog(ids[1]), getDog(ids[2]), getDog(ids[3])).subscribeOn(Schedulers.boundedElastic(), true).toStream()
.forEach(d -> System.out.println(Thread.currentThread().getName() + " :: " + d.toString()));
//output:
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main getDog() 等待 2 秒
main :: [Dog{id=1},Dog{id=2},Dog{id=3},Dog{id=4}]
想让 merge
方法里面的 getDog
方法通过多线程执行,最后 merge 一个集合,不知道有什么方式来解决, 请教一下,谢谢。
这样可以
已参与了 SegmentFault 思否社区 10 周年「问答」打卡 ,欢迎正在阅读的你也加入。