每天我们都会在学生主题中收到以下信息
Message 1: {"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1", "class":"1"}
Message 2: {"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1", "class":"1"}
Message 3: {"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1", "class":"2"}
Message 4: {"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1", "class":"2"}
Message 5: {"StudentID": "5", "StudentName":"eee","fatherName":"eee1", "class":"2"}
一天结束时(一天一次),根据每节课,我们要把所有的信息整合起来,以下面的格式发布到“学生主题输出”上。
Message 1:{"Class":"1"
{"StudentID": "1", "StudentName":"aaa","fatherName":"aaa1"},
{"StudentID": "2", "StudentName":"bbb","fatherName":"bbb1"}
}
Message 2:{"Class":"2"
{"StudentID": "3", "StudentName":"ccc","fatherName":"ccc1"},
{"StudentID": "4", "StudentName":"ddd","fatherName":"ddd1"},
{"StudentID": "5", "StudentName":"eee","fatherName":"eee1"}
}
我尝试了以下方法,但不知道如何创建没有类名的学生列表?
KStream<String, Object> sampleStream = builder.stream("Student-Topic-in");
sampleStream
.filter((k, v) -> v != null)
.mapValues(v -> (Student) v)
.groupBy((k, v) -> KeyValue.pair(v.getClass_name(), v))
.windowedBy(TimeWindows.of(5000))
//I am not sure how to create a student list without Classname
.aggregate(Student::new, (k, v, list) -> (Student)list.add((Student)v)
你能告诉我,如何构造输出json消息kafka流吗?
2条答案
按热度按时间pw9qyyiw1#
你可以做一个
KStream.groupBy(...).windowedBy().aggregate().mapValues
使用“class”属性进行分组。在
Aggregator()
你可以组装一个List
你转化成的学生JSON
在mapValues()
zzwlnbp82#
您可以按以下方式在列表中聚合消息:
一旦你拿到
List<Student>
,可以使用.mapValues()
功能。