使用Kafka检测值的变化

omqzjyyz  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(454)

我有一个流应用程序,它连续接收坐标流以及一些自定义元数据,这些元数据还包括一个位字符串。这个流是使用producerapi生成到kafka主题上的。现在,另一个应用程序需要处理这个流[streams api],并存储位字符串中的特定位,并在该位更改时生成警报
下面是需要处理的连续消息流

{"device_id":"1","status_bit":"0"}
{"device_id":"2","status_bit":"1"}
{"device_id":"1","status_bit":"0"}
{"device_id":"3","status_bit":"1"}
{"device_id":"1","status_bit":"1"} // need to generate alert with change: 0->1
{"device_id":"3","status_bits":"1"}
{"device_id":"2","status_bit":"1"}
{"device_id":"3","status_bits":"0"} // need to generate alert with change 1->0

现在我想把这些警告写到另一个Kafka主题,比如

{"device_id":1,"init":0,"final":1,"timestamp":"somets"}
{"device_id":3,"init":1,"final":0,"timestamp":"somets"}

我可以在状态存储中使用

streamsBuilder
        .stream("my-topic")
        .mapValues((key, value) -> value.getStatusBit())
        .groupByKey()
        .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
        .reduce((oldAggValue, newMessageValue) -> newMessageValue, Materialized.as("bit-temp-store"));

但我无法理解如何从现有位检测到这种变化。我是否需要查询处理器拓扑中的状态存储?如果是?怎样?如果没有?还有什么办法?
任何我可以尝试的建议/想法(可能与我的想法完全不同)也将不胜感激。我对Kafka很陌生,从事件驱动流的Angular 思考问题让我很困惑。
提前谢谢。

nimxete2

nimxete21#

我不确定这是不是最好的方法,但在类似的任务中,我使用了一个中间实体来捕获状态变化。你的情况是

streamsBuilder.stream("my-topic").groupByKey()
          .aggregate(DeviceState::new, new Aggregator<String, Device, DeviceState>() {
        public DeviceState apply(String key, Device newValue, DeviceState state) {
            if(!newValue.getStatusBit().equals(state.getStatusBit())){
                 state.setChanged(true);    
            }
            state.setStatusBit(newValue.getStatusBit());
            state.setDeviceId(newValue.getDeviceId());
            state.setKey(key);
            return state;
        }
    }, TimeWindows.of(…) …).filter((s, t) -> (t.changed())).toStream();

在生成的主题中,您将看到这些更改。您还可以向devicestate添加一些属性以首先对其进行初始化,具体取决于是否要发送事件、第一条设备记录何时到达等。

相关问题