如何从事件启动微服务

fiei3ece  于 2021-06-05  发布在  Kafka
关注(0)|答案(1)|浏览(379)

假设我有一个商店应用程序,我想对这个操作进行一些复杂的验证。
事件是我系统中唯一的真理来源。
添加产品由 ProductAdded 信息。
负责验证产品的微服务读取消息,对其进行验证,并生成 ProductValidated 信息。
但是如果我想让微服务从零开始启动会发生什么呢?在启动时,每个消息都会被重新处理,从而导致对每个已使用消息的冗余和重复验证。这可以通过首先读取消息队列中的所有消息来解决,当所有消息都被加载时,启动一个异步验证过程。
但它如何确保所有消息都已加载?也许消息的产生比从事件建立状态的过程要快。一个解决方案可以是查询消息队列中给定时刻的消息总数。然后,阅读并处理它们。然后,再次查询和处理。
这个问题是,在我看来,这并不是解决这一挑战的典型方法。我想知道在这种情况下做什么是一种流行的做法。

fv2wmkja

fv2wmkja1#

您几乎没有选择:
ktable,按购物车聚合(每个购物车不能有两次相同的产品)。为了防止这个增长太大,记录需要被“墓碑”,所以另一件事需要告诉应用程序购物车已经不存在了。
记住,要在kafka中进行任何类型的聚合,都需要本地存储。如果你不想或不能有本地存储,Kafka是错误的工具。
我不完全理解你的观点,但是微服务验证过程有问题。首先它说没有缓存或本地存储,第二点说加载所有内容(这意味着在本地存储中缓存)。
---编辑
您可以检查confluent中对订单进行验证的示例:https://github.com/confluentinc/kafka-streams-examples/tree/5.4.1-post/src/main/java/io/confluent/examples/streams/microservices .
如果我理解正确,您可以拥有一个没有变更日志的本地存储,这样您可以在重新启动时重新填充它。
检查inventoryservice.java类,在那里可以看到如何创建一个separe存储。您要省略的行是 .withLoggingEnabled() ,因为这样会创建一个changelog主题。

final StoreBuilder reservedStock = Stores
  .keyValueStoreBuilder(Stores.persistentKeyValueStore(RESERVED_STOCK_STORE_NAME),
    Topics.WAREHOUSE_INVENTORY.keySerde(), Serdes.Long())
builder.addStateStore(reservedStock);

你还需要做的两件事是:
将流配置为返回最早的记录 config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); 在构建流之前,有一些代码可以找到存储并擦除它。请查看confluent的这篇博文,这篇博文的一部分是LocalState存储,它解释了如何找到存储本地文件的目录,这样您就可以擦除目录了。

相关问题