kafka流:在json日志中按键分组

dzhpxtsq  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(398)

我有一个输入主题Kafka流应用程序 input 其中以下记录作为json日志出现:
json日志: {"CreationTime":"2018-02-12T12:32:31","UserId":"@gmail.com","Operation":"upload","Workload":"Drive"} 我正在从以下主题构建一个流:

  1. final StreamsBuilder builder = new StreamsBuilder();
  2. KStream<String, String> source_user_activity = builder.stream("input");

接下来我要分组 "UserId" 找到每个用户的计数。

  1. final Serde<String> stringSerde = Serdes.String();
  2. final Serde<Long> longSerde = Serdes.Long();
  3. final StreamsBuilder builder = new StreamsBuilder();
  4. KStream<String, String> source_user_activity = builder.stream("input");
  5. final KTable<String, Long> wordCounts = source_user_activity
  6. .flatMap((key, value) -> {
  7. List<KeyValue<String, String>> result = new LinkedList<>();
  8. JSONObject valueObject = new JSONObject(value);
  9. result.add(KeyValue.pair((valueObject.get("UserId").toString()), valueObject.toString()));
  10. return result;
  11. })
  12. .groupByKey()
  13. .count();
  14. wordCounts.toStream().to("output",Produced.with(stringSerde, longSerde));
  15. wordCounts.print();

下一步我将从 output 主题使用 console-consumer . 我没有看到任何文字,只是这样:
然而 wordCounts.print() 显示如下: [KSTREAM-AGGREGATE-0000000003]: @gmail.com, (1<-null) 我做错什么了?谢谢。

siotufzp

siotufzp1#

值的数据编码为 long (您正在使用 LongSerde 对于值)和控制台使用者用户 StringDeserializer 默认情况下,因此,它无法正确地反序列化该值。
您需要指定 LongDeserializer 通过控制台使用者的命令行参数获取值。

相关问题