java—如何将多个消息转换为单个消息?

2hh7jdfx  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(307)

每天我们都会在学生主题中收到以下信息

Message 1: {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1",  "class":"1"}
Message 2: {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1",  "class":"1"}
Message 3: {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1",  "class":"2"}
Message 4: {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1",  "class":"2"}
Message 5: {"StudentID": "5", "StudentName":"eee","fatherName":"eee1",  "class":"2"}

一天结束时(一天一次),根据每节课,我们要把所有的信息整合起来,以下面的格式发布到“学生主题输出”上。

Message 1:{"Class":"1"
          {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1"},
          {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1"}
       }
Message 2:{"Class":"2" 
          {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1"},
          {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1"},
          {"StudentID": "5", "StudentName":"eee","fatherName":"eee1"}
       }

我尝试了以下方法,但不知道如何创建没有类名的学生列表?

KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
    sampleStream
            .filter((k, v) -> v != null)
            .mapValues(v -> (Student) v)
            .groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
            .windowedBy(TimeWindows.of(5000))
            //I am not sure how to create a student list without Classname
            .aggregate(Student::new, (k, v, list) -> (Student)list.add((Student)v)

你能告诉我,如何构造输出json消息kafka流吗?

pw9qyyiw

pw9qyyiw1#

你可以做一个 KStream.groupBy(...).windowedBy().aggregate().mapValues 使用“class”属性进行分组。
Aggregator() 你可以组装一个 List 你转化成的学生 JSONmapValues()

zzwlnbp8

zzwlnbp82#

您可以按以下方式在列表中聚合消息:

KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
   KTable<Windowed<key>,List<Student>> aggregatedTable =  sampleStream
            .filter((k, v) -> v != null)
            .mapValues(v -> (Student) v)
            .groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
            .windowedBy(TimeWindows.of(5000))
            //I am not sure how to create a student list without Classname
            .aggregate(ArrayList::new, (k, v, list) -> list.add((Student)v, 
               Materialized.with(keySerde(), arrayListSerde())
 )

一旦你拿到 List<Student> ,可以使用 .mapValues() 功能。

相关问题