java 如何在reactor应用程序中等待两个不同的发布者完成并从其中一个发布者返回值

bq3bfh9z  于 2023-06-28  发布在  Java
关注(0)|答案(1)|浏览(142)

我有一个通量是从一个Flume。

Scheduler s = Schedulers.parallel();
Long[] data = new Long[10]{10, 20,...};

Sinks.Many<Long> dataSink = Sinks.many().unicast().onBackpressureBuffer();
var flux = dataSink.asFlux().publishOn(s).doOnNext(d -> log.info("got {}", d));

var mono = Mono.fromSupplier(() -> {
  val map = new HashSet<Long>();
  for (int i = 0; i < data.length; i++) {
    Long datum = data[i];
    map.add(datum);
    log.info("sending....{}", i);
    dataSink.tryEmitNext(datum);
  }
  return map;
});

如何同时等待flux和mono,然后从mono返回值。注意,通量取决于单声道,因为接收器在单声道的供应器内部发射。
一种选择是:

Mono.when(flux, mono).block();
return mono.block();

但我不想阻止,因为我将不得不从函数返回单声道,这将是稍后由客户端订阅。另一个选择是使用Flux.merge,但它返回一个可序列化的值,我不知道如何从mono中获取值。

htrmnn0y

htrmnn0y1#

这实际上取决于您需要从Flux获得什么数据以及您希望如何解析它们(顺序还是并行)。
对于顺序处理,可以使用

flux.then(mono)

要并行解析,可以将Flux转换为Mono,然后使用Mono.zip

Mono.zip(flux.last(), mono)

相关问题