我有一个需要扩展的项目集合,所以我选择reactor作为它的React能力,因为扩展需要io操作。
下面是一段工作代码:
public Flux<Item> expand(List<Item> unprocessedItems) {
return Flux.fromIterable(unprocessedItems)
.expandDeep(this::expandItem);
}
请注意 this::expandItem
是一个阻塞操作(多个数据库查询,一些计算…)。现在我希望这个扩展是平行的,但据我所知 .expand()
以及 .expandDeep()
只是 Flux
类而不是 ParallelFlux
班级。我试着添加 .publishOn()
以及 .subscribeOn()
在 .expand()
打电话,但运气不好。
这是我第一次使用React堆,但我没有看到任何技术问题阻止并行扩展,有什么办法吗?是api丢失了还是我丢失了什么?
1条答案
按热度按时间31moq8wy1#
是的,你是对的
ParallelFlux
没有.expand()
以及.expandDeep()
方法,但我可以使用其他方法,创建具有expand方法的其他发布服务器并将其传递给您的ParallelFlux
,如下所示:我的测试数据:
结果:
如你所见
expander
并行工作。