我用一个Kafka流来计算在过去3分钟内有多少事件发生在一个跳跃的时间窗口中:
public class ViewCountAggregator {
void buildStream(KStreamBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
KStream<String, String> views = builder.stream(stringSerde, stringSerde, "streams-view-count-input");
KStream<String, Long> viewCount = views
.groupBy((key, value) -> value)
.count(TimeWindows.of(TimeUnit.MINUTES.toMillis(3)).advanceBy(TimeUnit.MINUTES.toMillis(1)))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), value));
viewCount.to(stringSerde, longSerde, "streams-view-count-output");
}
public static void main(String[] args) throws Exception {
// some not so important initialization code
...
}
}
运行使用者并将某些消息推送到输入主题时,随着时间的推移,它会收到以下更新:
single 1
single 1
single 1
five 1
five 4
five 5
five 4
five 1
这几乎是正确的,但它从未收到以下更新:
single 0
five 0
如果没有它,我更新计数器的消费者将永远不会在较长时间内没有事件时将计数器设置回零。我希望已消费的邮件如下所示:
single 1
single 1
single 1
single 0
five 1
five 4
five 5
five 4
five 1
five 0
我是否缺少一些配置选项/参数来帮助我实现这种行为?
1条答案
按热度按时间7cjasjjr1#
这几乎是正确的,但它从未收到以下更新:
首先,计算的输出是正确的。
第二,为什么是正确的:
如果应用窗口聚合,则仅创建具有实际内容的窗口(我熟悉的所有其他系统都将生成相同的输出)。因此,如果对于某个键,没有超过窗口大小的时间段的数据,则不存在示例化的窗口,因此也根本不存在计数。
如果没有内容就不示例化windows的原因很简单:处理器不能知道所有键。在您的示例中,您有两个键,但稍后可能会出现第三个键。你想得到什么
<thirdKey,0>
从一开始?而且,由于数据流在本质上是无限的,所以密钥可能会消失,并且永远不会再出现。如果你记得所有看到的关键点<key,0>
如果没有消失的钥匙的数据,你会发射吗<key,0>
永远?我不想说你期望的结果/语义没有意义。这只是您的一个非常具体的用例,不适用于一般情况。因此,流处理器不实现它。
第三:你能做什么?
有多种选择:
您的使用者可以跟踪它所看到的键,并使用嵌入的记录时间戳确定某个键是否“丢失”,然后将该键的计数器设置为零(为此,还可以帮助删除
map
步进并保持Windowed<K>
键入键,以便使用者获得记录所属窗口的信息)添加有状态的
#transform()
在流应用程序中执行与(1)中所述相同的操作。为此,注册标点符号回调可能会有所帮助。方法(2)应该更容易跟踪关键点,因为您可以将状态存储附加到转换步骤,因此不需要处理下游使用者中的状态(以及故障/恢复)。
然而,对于这两种方法来说,棘手的部分仍然是确定何时缺少一个键,也就是说,您要等待多长时间才能生成一个键
<key,0>
. 请注意,数据可能会延迟到达(也称为无序),即使您确实发出了<key,0>
迟到的唱片可能会成为制作人<key,1>
代码发出<key,0>
记录。但也许这并不是一个真正的问题,你的情况下,似乎你只使用最新的窗口无论如何。最后但并非最不重要的一点是:似乎您只使用了最新的计数,并且较新的窗口覆盖了下游用户中较旧的窗口。因此,探索“交互式查询”以了解用户的状态可能是值得的
count
运算符直接代替使用者更新主题和其他一些状态。这可能允许您重新设计并大大简化下游应用程序。查看文档和一篇关于交互式查询的非常好的博客文章,了解更多详细信息。