我有一个流,它将消息Map到两个不同的map()调用,并进一步过滤和写入两个不同的主题。
KStream<String, byte[]>[] stream = builder.<String, byte[]>stream("source-topic");
stream.map(logic1OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic1", Produced.with(Serdes.String(), Serdes.String())
stream.map(logic2OnData).filter(
(key, value) -> {
if (key == null || value == null)
return false;
return value.data() != null;
}).to("topic2", Produced.with(Serdes.String(), Serdes.String())
有没有办法运行stream.map(logc1ondata)。。。和stream.map(logic2ondata)并行?看起来它们一个接一个地运行,即第一个Map被执行并写入topic1,然后第二个Map被执行并写入topic2供参考。。我不想要num.threads.count,因为我的流输入来自单个主题,并且我正在运行同一应用程序的多个示例来读取源主题,以便在使用时实现并行性。
我看到的是执行和编写不同主题时的并行性
1条答案
按热度按时间vu8f3i0k1#
您看到的是将操作添加到拓扑中的顺序。一旦拓扑被执行,记录器将按照它们到达的顺序流经otpology,但是
logic2OnData
不会等待logic1OnData
在运行前完成处理。如果你关心表现,你可以考虑
stream threads
如果你想获得更多的并行性。编辑:看来我可能没有理解这个问题。
单个子拓扑不允许并行运行每个分支。但是你可以使用
repartition()
将一个logic2ondata生成它自己的子拓扑和repartition()
调用将能够与之前的所有操作并行运行。