我有一个在ktable上聚合的拓扑。这是我创建的一个通用方法,用于在不同的主题上构建此拓扑。
public static <A, B, C> KTable<C, Set<B>> groupTable(KTable<A, B> table, Function<B, C> getKeyFunction,
Serde<C> keySerde, Serde<B> valueSerde, Serde<Set<B>> aggregatedSerde) {
return table
.groupBy((key, value) -> KeyValue.pair(getKeyFunction.apply(value), value),
Serialized.with(keySerde, valueSerde))
.aggregate(() -> new HashSet<>(), (key, newValue, agg) -> {
agg.remove(newValue);
agg.add(newValue);
return agg;
}, (key, oldValue, agg) -> {
agg.remove(oldValue);
return agg;
}, Materialized.with(keySerde, aggregatedSerde));
}
这在使用kafka时非常有效,但在通过“topologytestdriver”进行测试时就不行了。
在这两种情况下,当我得到一个更新时 subtractor
先调用,然后调用 adder
被称为。问题是当使用 TopologyTestDriver
,将发送两条消息进行更新:一条在 subtractor
打电话,然后再打一个 adder
打电话。更不用说在 subrtractor
在那之前 adder
处于不正确的阶段。
其他人能确认这是个窃听器吗?我在Kafka2.0.1版和2.1.0版上都测试过这个。
编辑:
我在github中创建了一个测试用例来说明这个问题:https://github.com/mulho/topology-testcase
1条答案
按热度按时间t9eec4r01#
预期的行为是有两个输出记录(一个“减”记录和一个“加”记录)。理解它的工作原理有点棘手,所以让我来解释一下。
假设您有以下输入表:
在
KTable#groupBy()
将值的第一部分提取为新键(即,10
或者11
)然后对第二部分(即,2
,3
,4
)在聚合中。因为A
以及B
记录都有10
作为新密钥,您将2+3
你还可以算出4
对于新密钥11
. 结果表将是:现在假设更新记录
<B,<11,5>>
将原始输入表更改为:因此,新的结果表应该是
5+4
为了11
以及2
为了10
:如果比较第一个结果表和第二个结果表,您可能会注意到两行都得到了更新。老年人
B|<10,3>
从中减去记录10|5
导致10|2
以及新的B|<11,5>
记录已添加到11|4
导致11|9
.这正是您看到的两个输出记录。第一个输出记录(在执行subtract之后)更新第一行(它减去不再是聚合结果一部分的旧值),而第二个记录将新值添加到聚合结果中。在我们的例子中,减法记录是
<10,<null,<10,3>>>
add记录是<11,<<11,5>,null>>
(这些记录的格式是<key, <plus,minus>>
(请注意,减法记录仅设置minus
当add记录只设置plus
零件)。最后一句话:不可能将正负记录放在一起,因为正负记录的键可能不同(在我们的示例中)
11
以及10
),因此可能会进入不同的分区。这意味着加号和减号操作可能由不同的机器执行,因此不可能只发出一条同时包含加号和减号部分的记录。