使用相同的键报告kafka流式处理应用程序中groupby()和groupbykey()的不同记录计数

4jb9z9bj  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(261)

当我们使用自定义的groupbykey()值时,我们在流处理中遇到了相当于“丢失的数据包”的情况。我们有一个单处理器节点,有一个源主题,我们从中读取数据包,对该组进行分组和聚合,并基于需要访问statestore的计算进行输出。
下面,让我详细介绍一下这个问题,以及到目前为止我们是如何试图理解它的:
概述我们正在设置一个kafka流应用程序,在其中我们必须执行窗口操作。我们正在根据特定密钥对设备进行分组。下面是我们用于groupby的示例列:

+---------+---------+------+
| Field Name | Field Value |
+---------+---------+------+
|       A    |    12       | 
|       B    |    abc      |  
|       C    |    x13      |
+---------+---------+------+

基于上述数据的示例键:12x13,其中键=字段(a)+字段(b)+字段(c)
使用groupby()使用groupbykey()对“input kafka topic”分区键上的数据进行分组时,在两种情况下针对同一个键获取不同记录计数的问题。
描述我们首先使用kafka streams的groupby()函数,使用上面的键对设备进行分组。在这种情况下,streams应用程序丢弃了多条记录,生成的记录数少于预期。但是,当我们没有使用groupby()函数指定自己的自定义分组,而是使用groupbykey()对原始传入的kafka分区键上的数据进行键控时,我们得到了预期的确切记录数。
为了检查是否使用了与自定义groupby()函数的输入主题完全相同的键,我们比较了代码中的两个键。输入主题键和自定义键完全相同。
所以现在我们得出结论,groupby函数有一些内部功能我们无法理解,因为groupby函数和groupbykey函数都报告了同一个键的不同计数。我们已经搜索了多个论坛,但无法理解这种现象的原因。
代码段:
使用groupbykey()

KStream<String, Output> myStream = this.stream
.groupByKey()       
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)      
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name(), this.store.name());

使用groupby():

KStream<String, Output> myStream = this.stream
.groupBy((key, value) -> value.A + value.B + value.C,
         Grouped.with(Serdes.String(), SerdesCollection.getInputSerdes()))
.windowedBy(TimeWindows.of(Duration.ofMinutes(windowTime)).advanceBy(Duration.ofMinutes(advanceByTime)).grace(Duration.ofSeconds(gracePeriod)))
.reduce((value1, value2) -> value2)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
.transform(new myTransformer(this.store.name()), this.store.name());

kafka群集设置

----------------------------
| No. of Nodes |    3      |
----------------------------
|  CPU Cores   |    2      | 
----------------------------
|     RAM      |    8 GB   |  
----------------------------

流媒体应用程序

-----------------------------------------
| Kafka Streams Version |    2.3.0      |
-----------------------------------------
|  Java Version         |     11        | 
-----------------------------------------

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题