如何在kafka流媒体中聚合多个json字段

jk9hmnmh  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(256)

在下面的代码中,我想将“job”和“country”字段上的记录进行聚合。目前我只能在“job”属性上进行聚合。

final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

KStream<String, JsonNode> personDetail = builder.stream("person-streams-input", Consumed.with(Serdes.String(), jsonSerde));
KTable<String,Long> articleagg = personDetail
                .groupBy((key,value) -> value.get("job").asText(), Serialized.with(Serdes.String(), jsonSerde))
                .count();

示例json:

{
  "name": "abc",
  "zipcode": "111111",
  "job": "engineer",
  "country": "USA"
}
nkhmeac6

nkhmeac61#

你可以建造任何 groupBy 基于消息键和值的条件:

KTable<String,Long> articleagg = personDetail
                .groupBy((key,value) -> getGroupByCondition(value), Serialized.with(Serdes.String(), jsonSerde))
                .count();

 private static String getGroupByCondition(JsonNode value) {
        return value.get("job").asText() + "_" + value.get("country").asText();
 }

相关问题