我有一个通量是从一个Flume。
Scheduler s = Schedulers.parallel();
Long[] data = new Long[10]{10, 20,...};
Sinks.Many<Long> dataSink = Sinks.many().unicast().onBackpressureBuffer();
var flux = dataSink.asFlux().publishOn(s).doOnNext(d -> log.info("got {}", d));
var mono = Mono.fromSupplier(() -> {
val map = new HashSet<Long>();
for (int i = 0; i < data.length; i++) {
Long datum = data[i];
map.add(datum);
log.info("sending....{}", i);
dataSink.tryEmitNext(datum);
}
return map;
});
如何同时等待flux和mono,然后从mono返回值。注意,通量取决于单声道,因为接收器在单声道的供应器内部发射。
一种选择是:
Mono.when(flux, mono).block();
return mono.block();
但我不想阻止,因为我将不得不从函数返回单声道,这将是稍后由客户端订阅。另一个选择是使用Flux.merge,但它返回一个可序列化的值,我不知道如何从mono中获取值。
1条答案
按热度按时间htrmnn0y1#
这实际上取决于您需要从
Flux
获得什么数据以及您希望如何解析它们(顺序还是并行)。对于顺序处理,可以使用
要并行解析,可以将
Flux
转换为Mono
,然后使用Mono.zip