我正在通过kafka streams 2.10构建一个流应用程序,我面临一个概念问题。
The producer1 sends (Key -> Value): Session1 -> RUNNING
The producer2 sends (Key -> Value): Sessionabc -> RUNNING
The producer1 sends (Key -> Value): Session1 -> DONE
现在我想检测一个死会话。我试着使用一个sessionwindow,但是因为kafka一个记录一个记录地计算,所以我不能一次计算所有的记录。
以下是我的片段:
builder
.stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
.groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
.windowedBy(SessionWindows.with(SESSION_DURATION))
.reduce(new SessionReducer())
.toStream((windowed, value) -> windowed.key())
.filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
.peek((a,b)->System.out.println("This Value is missing: \n "+a.toString()+b.toString()));`
注意:reducer只是确保当我们看到一个done时,不管我们在同一个会话中有哪个其他元素,它都会被完成。有什么想法吗?
1条答案
按热度按时间lo8azlld1#
有了处理器api,只需多花一点代码就可以轻松完成。dsl可以与处理器api混合使用。
处理过程如下所示。
构建状态存储并使用
StreamsBuilder::addStateStore
创建kstream并调用KStream::transform
与变压器一起工作,完成整个工作转换的结果将是消息和信息,如果会话是死的或完成的
使用transformer可以实现如何处理每条消息。对于每条消息,您必须更新keyvalue存储,其中key是会话id。您必须保存有关会话的最后一条消息的时间戳
然后在标点器(即周期性调用)中,检查哪个会话超时,并使用
ProcessorContext::forward
有状态(完成,死亡)整个代码如何做到这一点,你可以在这里找到