当我们使用自定义的groupbykey()值时,我们在流处理中遇到了相当于“丢失的数据包”的情况。我们有一个单处理器节点,有一个源主题,我们从中读取数据包,对该组进行分组和聚合,并基于需要访问statestore的计算进行输出。
下面,让我详细介绍一下这个问题,以及到目前为止我们是如何试图理解它的:
概述我们正在设置一个kafka流应用程序,在其中我们必须执行窗口操作。我们正在根据特定密钥对设备进行分组。下面是我们用于groupby的示例列:
+---------+---------+------+
| Field Name | Field Value |
+---------+---------+------+
| A | 12 |
| B | abc |
| C | x13 |
+---------+---------+------+
基于上述数据的示例键:12x13,其中键=字段(a)+字段(b)+字段(c)
使用groupby()使用groupbykey()对“input kafka topic”分区键上的数据进行分组时,在两种情况下针对同一个键获取不同记录计数的问题。
描述我们首先使用kafka streams的groupby()函数,使用上面的键对设备进行分组。在这种情况下,streams应用程序丢弃了多条记录,生成的记录数少于预期。但是,当我们没有使用groupby()函数指定自己的自定义分组,而是使用groupbykey()对原始传入的kafka分区键上的数据进行键控时,我们得到了预期的确切记录数。
为了检查是否使用了与自定义groupby()函数的输入主题完全相同的键,我们比较了代码中的两个键。输入主题键和自定义键完全相同。
所以现在我们得出结论,groupby函数有一些内部功能我们无法理解,因为groupby函数和groupbykey函数都报告了同一个键的不同计数。我们已经搜索了多个论坛,但无法理解这种现象的原因。
代码段:
使用groupbykey()
KStream<String, Output> myStream = this.stream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name(), this.store.name());
使用groupby():
KStream<String, Output> myStream = this.stream
.groupBy((key, value) -> value.A + value.B + value.C,
Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name()), this.store.name());
kafka群集设置
----------------------------
| No. of Nodes | 3 |
----------------------------
| CPU Cores | 2 |
----------------------------
| RAM | 8 GB |
----------------------------
流媒体应用程序
-----------------------------------------
| Kafka Streams Version | 2.3.0 |
-----------------------------------------
| Java Version | 11 |
-----------------------------------------
暂无答案!
目前还没有任何答案,快来回答吧!