默认情况下, .windowedBy(SessionWindows.with(Duration.ofSeconds(60))
返回每个传入记录的记录。
结合了 .count()
和一个 .filter()
检索第一条记录很容易。
使用 .suppress(Suppressed.untilWindowCloses(unbounded()))
检索上一张唱片也很容易。
所以…我做了两次处理,你可以看到改编的字数计算示例:
final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v == null ? -1l : v))
.filter((wk, v) -> v == 1)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v))
.filter((wk, v) -> v != null)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
但我想知道是否有一个更简单和更美丽的方式做同样的事情。
1条答案
按热度按时间vyu0f0g11#
我想你应该用
SessionWindowedKStream::aggregate(...)
并根据逻辑在聚合器中累积结果(第一个值和最后一个值)示例代码可能如下所示:
哪里
AggClass
是累加器和AggClassSerdes
塞德斯是那个蓄能器的