将示例化值类型保留在akka流divertTo或alsoTo中

wko9yo5t  于 2022-11-05  发布在  其他
关注(0)|答案(1)|浏览(115)

我尝试创建一个kafka消费者流,它的Source类型为Source<ConsumerMEssage.CommittableMessage<String,String>, Consumer.Control
从这个源代码中,我想根据一些 predicate 使用不同的路径。因此,我想使用divertTo方法,有时也使用alsoTo方法。
这两个方法都接受一个Sink。我的问题是,当我构建这个Sink时,我希望Sink具有由源提供的具体化值类型Consumer.Control。我现在要做的是构建这样的Sink

private Sink<SomeType, NotUsed> sinkForPathA(){
  Flow.of(SomeType.class)
    .to(Committer.sink(committerSettings));
}

正如你可能注意到的,物化的值类型现在是NotUsed,这是不需要的。

private Sink<SomeType, Consumer.DrainingControl> sinkForPathA(){
  Flow.of(SomeType.class)
    .toMat(Committer.sink(committerSettings),Consumer::createDrainingControl);
}

是否有可能创建一个具有预定义物化值类型的Flow,而不仅仅是NotUsed

vyu0f0g1

vyu0f0g11#

你当然可以用内部Sink的物化值来创建一个Flow
取实化值类型为Sink.seq()CompletionStage<List<T>>作为示例。
如果您要求创建一个Flow,如下所示:

Flow.of(Integer.class)
    .to(Sink.seq());

那么正如您所指出的,您将在返回的Sink中使用NotUsed。诀窍是将toMatkeepRight一起使用,以保留Sink的物化值类型。它将如下所示:

Flow.of(Integer.class)
    .toMat(Sink.seq(), Keep.right());

现在得到的结果是Sink<Integer, CompletionStage<List<Integer>>>

相关问题