如何进行kafka流转换(map/flatmap),同时考虑键/值存储中的值?

mw3dktmi  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(400)

我的任务如下:
我正在监视来自第三方测量设备的时间同步事件。这次同步有点不稳定,所以我想检测同步何时停止并发出警报。
为此,我将为一个kafka主题生成同步事件。我有三个不同的活动:
同步请求
同步成功
同步失败,因为其他设备没有响应
所以,我想做的是:
当收到请求时,经过一定的时间后什么也没有收到,我想发出一个“超时”警报
当收到请求时,在超时时间内,一个成功事件到达,如果在超时时间之后没有请求到达,我想发出一个“超时”
当故障事件到达时,我想发出“其他设备没有响应”警报
我目前正在设置kafka streams应用程序,需要存储状态以防此应用程序崩溃(不应该,但我想确定),因此我设置了以下内容:

val builder = new StreamsBuilder
val storeBuilder = Stores.
  keyValueStoreBuilder(Stores.persistentKeyValueStore("timesync-alarms"),
                       Serdes.String(),
                       logEntrySerde)
builder.addStateStore(storeBuilder)
val eventStream = builder.stream(sourceTopic, Consumed.`with`(Serdes.String(), logEntrySerde))

现在,我被困住了。我基本上认为我需要做的是 flatMap 函数,每当事件到达时:
查询存储中最后处理的事件
决定是否发出警报
使用当前接收的事件更新存储
产生警报(如果有)
那么,我如何在这里实现步骤1和3呢?还是我在观念上错了,不得不换一种方式去做?

sxissh06

sxissh061#

我认为你不需要直接使用state store。您可以创建两个流—一个包含同步请求事件,另一个包含同步响应(成功、失败)并将其加入:

requestStream.outerJoin(responseStream, (leftVal, rightVal) -> ...,
    JoinWindows.of(timeout), ...);

在超时的情况下 rightVal 为空。
如果您想将警报发送到一个单独的主题,您可以简单地过滤加入的流并将所有失败(错误响应和超时)写入该主题。否则你可以用 peek() 方法并在内部触发一些操作。下面是一个简单的例子:https://github.com/djarza/football-events/blob/master/football-ui/src/main/java/org/djar/football/ui/projection/statisticspublisher.java

相关问题