我需要知道如何使用我的Kafkakstreams行'for'循环…下面是我的'for'循环,需要包括在kstreams
for (int i = 0; i < 6 ; i++) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\\},\\{")));
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
我的博客就像
KStream<String, String> textlines = builder.stream("intopic");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[","" ) )
如何将上面的for循环添加到kstream中
1条答案
按热度按时间cuxqih211#
问题是我在for循环中使用了value.split来分割我的数据…所以每当我的数据被分割时,它应该休眠大约10毫秒…这是因为我需要一个接一个的数据
从你所说的你想要订购。要实现订购,您不需要
sleep
. 就行了。kafka streams wordcount示例(我假设您的代码基于此)的工作方式是相同的:它还使用flatMapValues
,传递到平面图的lambda将文本行拆分为单词。除非我和其他人误解了您的问题(在这种情况下,您也许应该进一步澄清您的问题),否则我认为您不必要地使代码复杂化。