假设我有一个商店应用程序,我想对这个操作进行一些复杂的验证。
事件是我系统中唯一的真理来源。
添加产品由 ProductAdded
信息。
负责验证产品的微服务读取消息,对其进行验证,并生成 ProductValidated
信息。
但是如果我想让微服务从零开始启动会发生什么呢?在启动时,每个消息都会被重新处理,从而导致对每个已使用消息的冗余和重复验证。这可以通过首先读取消息队列中的所有消息来解决,当所有消息都被加载时,启动一个异步验证过程。
但它如何确保所有消息都已加载?也许消息的产生比从事件建立状态的过程要快。一个解决方案可以是查询消息队列中给定时刻的消息总数。然后,阅读并处理它们。然后,再次查询和处理。
这个问题是,在我看来,这并不是解决这一挑战的典型方法。我想知道在这种情况下做什么是一种流行的做法。
1条答案
按热度按时间fv2wmkja1#
您几乎没有选择:
ktable,按购物车聚合(每个购物车不能有两次相同的产品)。为了防止这个增长太大,记录需要被“墓碑”,所以另一件事需要告诉应用程序购物车已经不存在了。
记住,要在kafka中进行任何类型的聚合,都需要本地存储。如果你不想或不能有本地存储,Kafka是错误的工具。
我不完全理解你的观点,但是微服务验证过程有问题。首先它说没有缓存或本地存储,第二点说加载所有内容(这意味着在本地存储中缓存)。
---编辑
您可以检查confluent中对订单进行验证的示例:https://github.com/confluentinc/kafka-streams-examples/tree/5.4.1-post/src/main/java/io/confluent/examples/streams/microservices .
如果我理解正确,您可以拥有一个没有变更日志的本地存储,这样您可以在重新启动时重新填充它。
检查inventoryservice.java类,在那里可以看到如何创建一个separe存储。您要省略的行是
.withLoggingEnabled()
,因为这样会创建一个changelog主题。你还需要做的两件事是:
将流配置为返回最早的记录
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
在构建流之前,有一些代码可以找到存储并擦除它。请查看confluent的这篇博文,这篇博文的一部分是LocalState存储,它解释了如何找到存储本地文件的目录,这样您就可以擦除目录了。