我想做的是:
使用主题中的记录
计算每1秒窗口的值
检测记录数小于4的窗口
将最终结果发送到另一个主题
我使用suppress来发送最终结果,但出现了这样的错误。
09:18:07,963 ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager
- task [1_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002:
java.lang.ClassCastException: org.apache.kafka.streams.kstream.Windowed cannot be cast to java.lang.String
at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:28)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.buffer(KTableSuppressProcessor.java:86)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:78)
at org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor.process(KTableSuppressProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:146)
.....
我想我的代码和开发者指南中的例子是一样的。有什么问题吗?我的密码在这里。
final KStream<String, String> views = builder.stream("fluent-newData");
final KTable<Windowed<String>, Long> anomalousUsers = views
.map((key, value) -> {
JSONObject message = JSONObject.fromObject(value);
String[] strArry = message.getString("detail").split(",");
return KeyValue.pair(strArry[0], value);
})
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(1))
.grace(Duration.ofSeconds(20)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()))
.filter((windowedUserId, count) -> count < 4);
final KStream<String, String> anomalousUsersForConsole = anomalousUsers
.toStream()
.filter((windowedUserId, count) -> count != null)
.map((windowedUserId, count) -> new KeyValue<>(windowedUserId.toString(), windowedUserId.toString() +" c:" + count.toString()));
anomalousUsersForConsole.to("demo-count-output", Produced.with(stringSerde, stringSerde));
1条答案
按热度按时间jecbmhm31#
“windowed不能转换为java.lang.string”通常在您没有直接指定serdes时抛出。
当你建造
stream(..)
,指定直接使用的示例,如下所示:也适用于
groupByKey()
你需要通过Grouped
如下所示: