topologytestdriver在ktable聚合上发送错误消息

zf2sa74q  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(350)

我有一个在ktable上聚合的拓扑。这是我创建的一个通用方法,用于在不同的主题上构建此拓扑。

  1. public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
  2. Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
  3. return table
  4. .groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
  5. Serialized.with(keySerde, valueSerde))
  6. .aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
  7. agg.remove(newValue);
  8. agg.add(newValue);
  9. return agg;
  10. }, (key, oldValue, agg) -> {
  11. agg.remove(oldValue);
  12. return agg;
  13. }, Materialized.with(keySerde, aggregatedSerde));
  14. }

这在使用kafka时非常有效,但在通过“topologytestdriver”进行测试时就不行了。
在这两种情况下,当我得到一个更新时 subtractor 先调用,然后调用 adder 被称为。问题是当使用 TopologyTestDriver ,将发送两条消息进行更新:一条在 subtractor 打电话,然后再打一个 adder 打电话。更不用说在 subrtractor 在那之前 adder 处于不正确的阶段。
其他人能确认这是个窃听器吗?我在Kafka2.0.1版和2.1.0版上都测试过这个。
编辑:
我在github中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase

t9eec4r0

t9eec4r01#

预期的行为是有两个输出记录(一个“减”记录和一个“加”记录)。理解它的工作原理有点棘手,所以让我来解释一下。
假设您有以下输入表:

  1. key | value
  2. -----+---------
  3. A | <10,2>
  4. B | <10,3>
  5. C | <11,4>

KTable#groupBy() 将值的第一部分提取为新键(即, 10 或者 11 )然后对第二部分(即, 2 , 3 , 4 )在聚合中。因为 A 以及 B 记录都有 10 作为新密钥,您将 2+3 你还可以算出 4 对于新密钥 11 . 结果表将是:

  1. key | value
  2. -----+---------
  3. 10 | 5
  4. 11 | 4

现在假设更新记录 <B,<11,5>> 将原始输入表更改为:

  1. key | value
  2. -----+---------
  3. A | <10,2>
  4. B | <11,5>
  5. C | <11,4>

因此,新的结果表应该是 5+4 为了 11 以及 2 为了 10 :

  1. key | value
  2. -----+---------
  3. 10 | 2
  4. 11 | 9

如果比较第一个结果表和第二个结果表,您可能会注意到两行都得到了更新。老年人 B|<10,3> 从中减去记录 10|5 导致 10|2 以及新的 B|<11,5> 记录已添加到 11|4 导致 11|9 .
这正是您看到的两个输出记录。第一个输出记录(在执行subtract之后)更新第一行(它减去不再是聚合结果一部分的旧值),而第二个记录将新值添加到聚合结果中。在我们的例子中,减法记录是 <10,<null,<10,3>>> add记录是 <11,<<11,5>,null>> (这些记录的格式是 <key, <plus,minus>> (请注意,减法记录仅设置 minus 当add记录只设置 plus 零件)。
最后一句话:不可能将正负记录放在一起,因为正负记录的键可能不同(在我们的示例中) 11 以及 10 ),因此可能会进入不同的分区。这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一条同时包含加号和减号部分的记录。

展开查看全部

相关问题