我有一个Kafka主题转换的历史,我在其中流的csv文件。现在我要计算每个party id的出现次数,然后在应用转换之后:如果计数小于20,则将其放入新的topic chuser,如果计数大于20,则将其放入topic lonyal,我使用的是java
公共类第一过滤器{
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
/*input messages example
{"155555","11.11.2016 11:12}
{"155555","11.11.2016 13:12}
{"155556","11.11.2016 13:12}
result to be achived:
{"155555","2"}
{"155556","1"}
*/
builder.stream("test_topic_3")
// .map()
.groupByKey()
// .windowedBy(Window) // This may or may not be required
.count()
.toStream()
.map(
(key,count) -> new KeyValue<>(key.toString(),count)
)
.to("test_output_filtered_3");//this topic is empty after the run
我对Kafka不太了解,请帮我一下
1条答案
按热度按时间z6psavjg1#
这可以通过Kafka流很容易地实现。首先确保您有kstream&ktable的背景知识。您需要遵循以下步骤。
注意:这只是一个伪代码,它将给你一个如何实现这一壮举的想法