我们使用的是micronaut/kafka流。使用此框架来创建streams应用程序,您可以构建如下内容:
@Factory
public class FooTopologyConfig {
@Singleton
@Named
public KStream<String, FooPojo> configureTopology {
return builder.stream("foo-topic-in")
.peek((k,v) -> System.out.println(String.format("key %s, value: %s", k,v))
.to("foo-topic-out");
}
}
这是:
收到 ConfiguredStreamBuilder
(包裹得很轻 StreamsBuilder
)
构建并返回流(我们实际上不确定返回流有多重要,但这是另一个问题)。 ConfiguredStreamBuilder::build()
(在 StreamsBuilder
)稍后由框架调用并返回 Topology
不能通过micronaut进行注射。
我们想要 Topology
bean来记录拓扑的描述(通过 Topology::describe
).
这样做安全吗?
呼叫 ConfiguredStreamBuilder::build
(因此 StreamsBuilder::build
)并使用 Topology
打印可读的描述。
允许框架调用 ConfiguredStreamBuilder::build
然后,使用返回拓扑的第二个示例来构建应用程序。
1条答案
按热度按时间ej83mcc01#
打电话应该没问题
build()
多次。这在流的内部代码以及测试中都很常见。回答你的另一个问题。你只需要从
builder.stream()
如果以后要在拓扑的该分支上展开,请执行以下操作。