我尝试创建一个kafka消费者流,它的Source类型为Source<ConsumerMEssage.CommittableMessage<String,String>, Consumer.Control
。
从这个源代码中,我想根据一些 predicate 使用不同的路径。因此,我想使用divertTo
方法,有时也使用alsoTo
方法。
这两个方法都接受一个Sink。我的问题是,当我构建这个Sink时,我希望Sink具有由源提供的具体化值类型Consumer.Control
。我现在要做的是构建这样的Sink
private Sink<SomeType, NotUsed> sinkForPathA(){
Flow.of(SomeType.class)
.to(Committer.sink(committerSettings));
}
正如你可能注意到的,物化的值类型现在是NotUsed,这是不需要的。
private Sink<SomeType, Consumer.DrainingControl> sinkForPathA(){
Flow.of(SomeType.class)
.toMat(Committer.sink(committerSettings),Consumer::createDrainingControl);
}
是否有可能创建一个具有预定义物化值类型的Flow,而不仅仅是NotUsed
?
1条答案
按热度按时间vyu0f0g11#
你当然可以用内部
Sink
的物化值来创建一个Flow
。取实化值类型为
Sink.seq()
:CompletionStage<List<T>>
作为示例。如果您要求创建一个
Flow
,如下所示:那么正如您所指出的,您将在返回的
Sink
中使用NotUsed
。诀窍是将toMat
与keepRight
一起使用,以保留Sink
的物化值类型。它将如下所示:现在得到的结果是
Sink<Integer, CompletionStage<List<Integer>>>
。