下面是 Spring Kafka的例子:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/master/kafka-streams-samples/kafka-streams-aggregate
我已经构建了该示例的一些变体,以构建更复杂的数据结构,而不仅仅是一个字符串附加,所有这些都可以很好地工作。
但是,我对使用侦听器感兴趣,不管它是云流、kafa流还是本地kafka来响应新的聚合值。我很难找到任何具体的例子,甚至有点接近,所以我希望得到一个正确的方向推动。
我总是可以从aggregate方法手动将聚合写入另一个流
kStream.groupBy(...).aggregate(
...
(s, domainEvent, agg) -> {
...
// Update aggregate
...
secondaryStreamChannel().send(MessageBuilder.withPayload(agg).build());
},
...
);
但这似乎并不理想,因为这会使实际的聚合/快照流显得有些多余。它现在也打破了聚合方法的隔离,并将其绑定到另一个流中。
我可以用正在聚合的密钥通知其他处理程序,但是在下游处理程序的执行和聚合的完成之间存在潜在的竞争条件。
从本质上说,我希望有一个事件流,然后被聚合,然后有一个方法被调用,并至少提供聚合密钥或密钥/聚合对。理想情况下,在聚合更改可用之前不会调用该方法。这是否可以使用现有的云流、kafka流或kafka api?还有别的办法吗。。。我是不是试图把这个用例和kafka/streams联系得太紧密了,在那里它也许不需要?
暂无答案!
目前还没有任何答案,快来回答吧!