我有一个输入主题Kafka流应用程序 input
其中以下记录作为json日志出现:
json日志: {"CreationTime":"2018-02-12T12:32:31","UserId":"@gmail.com","Operation":"upload","Workload":"Drive"}
我正在从以下主题构建一个流:
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
接下来我要分组 "UserId"
找到每个用户的计数。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source_user_activity = builder.stream("input");
final KTable<String, Long> wordCounts = source_user_activity
.flatMap((key, value) -> {
List<KeyValue<String, String>> result = new LinkedList<>();
JSONObject valueObject = new JSONObject(value);
result.add(KeyValue.pair((valueObject.get("UserId").toString()), valueObject.toString()));
return result;
})
.groupByKey()
.count();
wordCounts.toStream().to("output",Produced.with(stringSerde, longSerde));
wordCounts.print();
下一步我将从 output
主题使用 console-consumer
. 我没有看到任何文字,只是这样:
然而 wordCounts.print()
显示如下: [KSTREAM-AGGREGATE-0000000003]: @gmail.com, (1<-null)
我做错什么了?谢谢。
1条答案
按热度按时间siotufzp1#
值的数据编码为
long
(您正在使用LongSerde
对于值)和控制台使用者用户StringDeserializer
默认情况下,因此,它无法正确地反序列化该值。您需要指定
LongDeserializer
通过控制台使用者的命令行参数获取值。