如何计算Kafka主题中每个id的频率

rjjhvcjd  于 2021-06-29  发布在  Java
关注(0)|答案(1)|浏览(347)

我有一个Kafka主题转换的历史,我在其中流的csv文件。现在我要计算每个party id的出现次数,然后在应用转换之后:如果计数小于20,则将其放入新的topic chuser,如果计数大于20,则将其放入topic lonyal,我使用的是java
公共类第一过滤器{

public static void main(String[] args) {

    final StreamsBuilder builder = new StreamsBuilder();

    /*input messages example
     {"155555","11.11.2016 11:12}
     {"155555","11.11.2016 13:12}
     {"155556","11.11.2016 13:12}
     result to be achived:
     {"155555","2"}
     {"155556","1"}
     */
    builder.stream("test_topic_3")
//                .map()
                  .groupByKey()
//                .windowedBy(Window) // This may or may not be required
                  .count()
                  .toStream()
                  .map(
                    (key,count) -> new KeyValue<>(key.toString(),count)
            )
            .to("test_output_filtered_3");//this topic is empty after the run

我对Kafka不太了解,请帮我一下

z6psavjg

z6psavjg1#

这可以通过Kafka流很容易地实现。首先确保您有kstream&ktable的背景知识。您需要遵循以下步骤。

builder.<Keytype, ValueType>stream(YourInputTopic))
    .map()
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // This may or may not be required 
                                                           in your case
    .count()
    .toStream()
    .map((Windowed<String> key, Long count) -> new KeyValue<>(key.key(),count.toString()))
    .filter((k,v)-> Long.parseLong(v) > 20) // This is the filter
    .to(TopicName));

注意:这只是一个伪代码,它将给你一个如何实现这一壮举的想法

相关问题