在同一kafka streams应用程序中重用kstream对象以跨多个时间窗口查找计数

mwngjboj  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(271)

我可以在一个拓扑中多次重用同一kstreams对象并创建多个时间窗口计数吗?
我有一个Kafka流应用程序,消费一个Kafka主题。我想为多个时间窗口创建计数(例如,按分钟、按小时)。
我想知道做以下这些是否有什么问题。我看到它工作得很好——只是想知道流是否应该像那样被重用。

public void accept(KStream<MyKey, MyKey> myKeyStream,
                   GlobalKTable<String, String> globalTable) {

    myKeyStream.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ofMinutes(1)))
            .count(Materialized.<MyKey, Long, WindowStore<Bytes, byte[]>>as(
                    "minute-store")
                    .withRetention(Duration.ofMinutes(1)))
            .toStream()
            .map((k, v) -> new KeyValue<>(k.key(), v))
            .join(globalTable,
                    (MyKey k, Long v) -> k.getValue().toString(),
                    new ValueJoiner())
            .print(Printed.toSysOut());

    myKeyStream.groupByKey()
            .windowedBy(TimeWindows.of(Duration.ofHour(1)).grace(Duration.ofHour(1)))
            .count(Materialized.<MyKey, Long, WindowStore<Bytes, byte[]>>as(
                    "hour-store")
                    .withRetention(Duration.ofHour(1)))
            .toStream()
            .map((k, v) -> new KeyValue<>(k.key(), v))
            .join(globalTable,
                    (MyKey k, Long v) -> k.getValue().toString(),
                    new ValueJoiner())
            .print(Printed.toSysOut());
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题