我有一个Kafka主题,我期望消息有两种不同的键类型:旧的和新的。即。 "1-new"
, "1-old"
, "2-new"
, "2-old"
. 密钥是唯一的,但有些可能丢失。
现在使用kotlin和kafkastreamsapi,我可以记录那些新的和旧的密钥id相同的消息。
val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())
val newStream = stream.filter({ key, _ -> isNew(key) })
.map({key, value -> KeyValue(key.replace(NEW_PREFIX, ""), value) })
val oldStream = stream.filter({ key, _ -> isOld(key) })
.map({key, value -> KeyValue(key.replace(OLD_PREFIX, ""), value) })
val joined = newStream.join(oldStream,
{ value1, value2 -> "$value1&$value2" }, windows)
joined.foreach({ key, value ->
log.info { "JOINED $key : $value" }
})
现在我想知道由于某种原因在时间窗口中丢失的新/旧钥匙。有没有可能用kafkastreams api实现?
在我的情况下,当钥匙 "1-old"
收到并 "1-new"
不是在2分钟内,只有在这种情况下,我要报告id 1
因为可疑。
3条答案
按热度按时间cigdeys31#
dsl可能不会给你想要的。但是,您可以使用处理器api。既然这么说
leftJoin
实际上可以用来做“重物搬运”。因此,在leftJoin
你可以用.transform(...)
使用附加状态进一步“清理”数据。对于每个
old&null
把你收到的唱片放进商店。如果您稍后收到old&new
你可以把它从商店里拿出来。此外,您注册了一个标点符号,每次调用标点符号时,您都会在存储区中扫描“足够旧”的条目,这样您就可以确定以后不会再调用了old&new
将生成联接结果。对于这些条目,您将发出old&null
把它们从商店里拿出来。另一种方法是,您也可以省略join,在一个
transform()
与国家。为此,你需要KStream#merge()
新旧流与呼transform()
在合并的流上。注意:除了注册标点符号,您还可以将“扫描逻辑”放入转换中,并在每次处理记录时执行它。
mtb9vblg2#
看起来像你要找的。kafka流左外连接超时
消除了kafka streams框架中缺少类似sql的左连接语义。只有在连接窗口持续时间间隔内没有发生完全连接事件时,此实现才会生成左连接事件。
egmofgnx3#
如果我正确理解你的问题,你只想报告身份证可疑时,有一个“旧”没有相应的“新”在2分钟的窗口。
如果是这种情况,则需要使用左连接:
hth公司