与kafkastreams的窗口外连接的结束

rsaldnfx  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(358)

我有一个Kafka主题,我期望消息有两种不同的键类型:旧的和新的。即。 "1-new" , "1-old" , "2-new" , "2-old" . 密钥是唯一的,但有些可能丢失。
现在使用kotlin和kafkastreamsapi,我可以记录那些新的和旧的密钥id相同的消息。

val windows = JoinWindows.of(Duration.of(2, MINUTES).toMillis())

    val newStream = stream.filter({ key, _ -> isNew(key) })
            .map({key, value ->  KeyValue(key.replace(NEW_PREFIX, ""), value) })

    val oldStream = stream.filter({ key, _ -> isOld(key) })
            .map({key, value ->  KeyValue(key.replace(OLD_PREFIX, ""), value) })

    val joined = newStream.join(oldStream,
            { value1, value2 -> "$value1&$value2" }, windows)

    joined.foreach({ key, value ->
        log.info { "JOINED $key : $value" }
    })

现在我想知道由于某种原因在时间窗口中丢失的新/旧钥匙。有没有可能用kafkastreams api实现?
在我的情况下,当钥匙 "1-old" 收到并 "1-new" 不是在2分钟内,只有在这种情况下,我要报告id 1 因为可疑。

cigdeys3

cigdeys31#

dsl可能不会给你想要的。但是,您可以使用处理器api。既然这么说 leftJoin 实际上可以用来做“重物搬运”。因此,在 leftJoin 你可以用 .transform(...) 使用附加状态进一步“清理”数据。
对于每个 old&null 把你收到的唱片放进商店。如果您稍后收到 old&new 你可以把它从商店里拿出来。此外,您注册了一个标点符号,每次调用标点符号时,您都会在存储区中扫描“足够旧”的条目,这样您就可以确定以后不会再调用了 old&new 将生成联接结果。对于这些条目,您将发出 old&null 把它们从商店里拿出来。
另一种方法是,您也可以省略join,在一个 transform() 与国家。为此,你需要 KStream#merge() 新旧流与呼 transform() 在合并的流上。
注意:除了注册标点符号,您还可以将“扫描逻辑”放入转换中,并在每次处理记录时执行它。

mtb9vblg

mtb9vblg2#

看起来像你要找的。kafka流左外连接超时
消除了kafka streams框架中缺少类似sql的左连接语义。只有在连接窗口持续时间间隔内没有发生完全连接事件时,此实现才会生成左连接事件。

egmofgnx

egmofgnx3#

如果我正确理解你的问题,你只想报告身份证可疑时,有一个“旧”没有相应的“新”在2分钟的窗口。
如果是这种情况,则需要使用左连接:

val leftJoined = oldStream.leftJoin(newStream,...).filter(condition where value expected from "new" stream is null);

hth公司

相关问题