我有一个流应用程序,它连续接收坐标流以及一些自定义元数据,这些元数据还包括一个位字符串。这个流是使用producerapi生成到kafka主题上的。现在,另一个应用程序需要处理这个流[streams api],并存储位字符串中的特定位,并在该位更改时生成警报
下面是需要处理的连续消息流
{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0
现在我想把这些警告写到另一个Kafka主题,比如
{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}
我可以在状态存储中使用
streamsBuilder
.stream("my-topic")
.mapValues((key, value) -> value.getStatusBit())
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));
但我无法理解如何从现有位检测到这种变化。我是否需要查询处理器拓扑中的状态存储?如果是?怎样?如果没有?还有什么办法?
任何我可以尝试的建议/想法(可能与我的想法完全不同)也将不胜感激。我对Kafka很陌生,从事件驱动流的Angular 思考问题让我很困惑。
提前谢谢。
1条答案
按热度按时间nimxete21#
我不确定这是不是最好的方法,但在类似的任务中,我使用了一个中间实体来捕获状态变化。你的情况是
在生成的主题中,您将看到这些更改。您还可以向devicestate添加一些属性以首先对其进行初始化,具体取决于是否要发送事件、第一条设备记录何时到达等。