Kafka流检测丢失的记录

xqkwcwgp  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(345)

我正在通过kafka streams 2.10构建一个流应用程序,我面临一个概念问题。

The producer1 sends (Key -> Value): Session1 -> RUNNING

The producer2 sends (Key -> Value): Sessionabc -> RUNNING

The producer1 sends (Key -> Value): Session1 -> DONE

现在我想检测一个死会话。我试着使用一个sessionwindow,但是因为kafka一个记录一个记录地计算,所以我不能一次计算所有的记录。
以下是我的片段:

builder
    .stream("topic", Consumed.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .groupByKey(Grouped.with(serdeKeySessionEvent, serdeValueSessionEvent))
    .windowedBy(SessionWindows.with(SESSION_DURATION))
    .reduce(new SessionReducer())
    .toStream((windowed, value) -> windowed.key())
    .filter((k,v)-> Objects.nonNull(v) && v.getStatus() == Status.RUNNING)
    .peek((a,b)->System.out.println("This Value is missing: \n   "+a.toString()+b.toString()));`

注意:reducer只是确保当我们看到一个done时,不管我们在同一个会话中有哪个其他元素,它都会被完成。有什么想法吗?

lo8azlld

lo8azlld1#

有了处理器api,只需多花一点代码就可以轻松完成。dsl可以与处理器api混合使用。
处理过程如下所示。
构建状态存储并使用 StreamsBuilder::addStateStore 创建kstream并调用 KStream::transform 与变压器一起工作,完成整个工作
转换的结果将是消息和信息,如果会话是死的或完成的
使用transformer可以实现如何处理每条消息。对于每条消息,您必须更新keyvalue存储,其中key是会话id。您必须保存有关会话的最后一条消息的时间戳
然后在标点器(即周期性调用)中,检查哪个会话超时,并使用 ProcessorContext::forward 有状态(完成,死亡)
整个代码如何做到这一点,你可以在这里找到

相关问题