在下面的代码中,我想将“job”和“country”字段上的记录进行聚合。目前我只能在“job”属性上进行聚合。
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, JsonNode> personDetail = builder.stream("person-streams-input", Consumed.with(Serdes.String(), jsonSerde));
KTable<String,Long> articleagg = personDetail
.groupBy((key,value) -> value.get("job").asText(), Serialized.with(Serdes.String(), jsonSerde))
.count();
示例json:
{
"name": "abc",
"zipcode": "111111",
"job": "engineer",
"country": "USA"
}
1条答案
按热度按时间nkhmeac61#
你可以建造任何
groupBy
基于消息键和值的条件: