滑动窗口中的kafka kstream相关消息事件

xytpbqjk  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(327)

我们有一种情况,我认为Kafka流可以帮助,但我找不到任何文件或例子表明如何。
我发现了一个类似的问题,但它没有任何实现建议:kafka streams wait function with dependent objects
我想做的是:
我想将Kafka主题中的相关记录关联到单个对象中,并将新对象发布到单独的输出主题中。例如,可能有五条消息记录通过唯一键相互关联—我想从这些相关对象构建一个新对象,并将其生成到新主题。
我希望所有相关的事件在一个小时的滑动窗口内被聚合。换句话说,一旦id为“123”的消息到达消费者,应用程序必须等待至少一个小时,以等待id为“123”的剩余记录到达。所有记录到达或一小时后,这些记录将过期。
最后,使用一小时内收集的所有相关消息创建一个新对象,然后将其发送到另一个Kafka主题。
我遇到的问题。
Kafka的滑动窗口似乎只在两条溪流连接在一起时才起作用。我们将只有一个流连接到这个主题-我不知道为什么需要两个流,或者我们将如何实现这一点。我在网上找不到任何这样的例子。我在kafka中看到的所有流函数在收集同一个键的事件时都只是聚合/简化为一个简单的值。例如,某个键出现的次数或某个值的总和
下面是一些伪代码来描述我所说的。如果功能存在,那么函数名/语义将不同。

KStream<Key, Object> kstream = kStreamBuilder.stream(TOPIC);
    kstream.windowedBy(
    // One hour sliding Window
    )
    .collectAllRelatedKeys(
    // Collect all Records related to each key
    // map == HashMap<Key, ArrayList<Value>>
       map.get(key).add(value);
    )
    .transformAndProcess(
        if(ALL_EVENTS_COLLECTED) {
        // Create new Object from all related records
            newObject = 
            createNewObjectFromRelatedRecordsFunction(map.get(key));
            producer.send(newObject);   
        }
    )

问题(感谢您的帮助):
如何使用单流滑动窗口?
如何自定义kstream/ktable函数来收集时间窗口内的所有相关事件,并为另一个主题生成新对象?
确认/偏移管理如何处理滑动窗口流?
这能保证一次交货吗?供参考:https://www.confluent.io/blog/enabling-exactly-kafka-streams/

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题