在kstreams应用程序中使用自定义kafka状态存储

a8jjtwal  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(564)

我们正在使用spring cloud stream hoxton rc7项目中包含的kafka流(因此使用提供的kafka流和kafka客户端版本[2.3.1])

  1. ext {
  2. set('springCloudVersion', 'Hoxton.SR7')
  3. }
  4. ...
  5. dependencies {
  6. // spring cloud stream
  7. implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka-streams'
  8. implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
  9. implementation("org.springframework.cloud:spring-cloud-stream")
  10. // redis
  11. implementation 'io.lettuce:lettuce-core'
  12. implementation 'org.springframework.data:spring-data-redis'
  13. testCompile 'it.ozimov:embedded-redis:0.7.2'
  14. ...

我们已经实现了一个kstreams应用程序

  1. @Bean
  2. public Consumer<KStream<String, IncomingEvent>> process() {
  3. return input -> {

我们在其中进行聚合,比如:

  1. .aggregate(Foo::new, (key, value1, aggregate) ->
  2. (aggregate == null || aggregate.getLastModified() == null || this.mustProcess(key, value1))
  3. ? value1
  4. : aggregate,
  5. materialized
  6. )

现在具体化的应该是一个自定义外部状态存储(redis):

  1. Materialized<String, Foo, KeyValueStore<Bytes, byte[]>> materialized =
  2. Materialized.as("redis-store");

由storebuilder bean提供:

  1. @Bean
  2. public StoreBuilder<KeyValueStore<String, Foo>> builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes){
  3. return Stores.keyValueStoreBuilder(supplier(redisKeyValueStoreBytes),
  4. new Serdes.StringSerde(),
  5. new SomeFooSerde());
  6. }
  7. public static KeyValueBytesStoreSupplier supplier(RedisKeyValueStoreBytes redisKeyValueStoreBytes) {
  8. return new KeyValueBytesStoreSupplier() {
  9. @Override
  10. public String name() {
  11. return "redis-store";
  12. }
  13. @Override
  14. public KeyValueStore<Bytes, byte[]> get() {
  15. return redisKeyValueStoreBytes;
  16. }
  17. @Override
  18. public String metricsScope() {
  19. return "redis-session-state";
  20. }
  21. };
  22. }

我现在用一个嵌入的kafka测试应用程序:

  1. @ActiveProfiles("test")
  2. @RunWith(SpringRunner.class)
  3. @DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
  4. @SpringBootTest(classes = {TestConfigurationTests.class})
  5. @EmbeddedKafka(count = 3, ports = {29901, 29902, 29903}, zookeeperPort = 33991)
  6. public class TestKafkaIntegration {

我尝试访问状态存储并查询添加的项的位置:

  1. ReadOnlyKeyValueStore<String, Foo> queryableStore = interactiveQueryService.getQueryableStore(
  2. "redis-store", QueryableStoreTypes.keyValueStore());
  3. return queryableStore;

但当我运行测试时,我收到一个错误:

  1. Caused by: org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.TopologyException: Invalid topology: StateStore redis-store is already added.

几个问题:
[1]中解释的使用自定义状态存储的示例在处理器中使用它。这是否意味着我不能在聚合中使用自定义状态存储?
当无法在聚合中使用它时,使用自定义状态存储还有什么意义?
当我为kstreams稍微修改上面的代码并定义一个处理器而不是在aggregate方法中使用materialized时,错误发生了变化,然后它在尝试执行getqueryablestore时抱怨缺少状态“redis store”store。但实际上我可以看到,addstatestorebeans注册了“redis store”。怎么会这样?
我想使用自定义状态存储的原因是,我不能(很容易)为应用程序示例提供专用硬盘。为了快速启动应用程序,我希望避免在每次启动应用程序时处理完整的changelog(最好一天启动几次,目前需要一个多小时)。最后一个问题是:
使用自定义外部状态存储时,我是否能够恢复到应用程序重新启动时的最后一个状态?
[1] https://spring.io/blog/2019/12/09/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-6-state-stores-and-interactive-queries

4smxwvx5

4smxwvx51#

您正在使用materialized.as(java.lang.string storename),它将创建(materialize)一个 StateStore 使用给定的名称(“redis store”在这里)。另一方面 builder(RedisKeyValueStoreBytes redisKeyValueStoreBytes) 你在创造另一个 StateStore 如果使用相同的名称,那么springframework可能会自动将其添加到拓扑中,这样就会出现“store is already added”错误。
q1:您可以在聚合中使用自定义状态存储;与materialized.as(keyvaluebytesstoresupplier)一起使用
问题2:也可以使用 StateStore 使用转换器或自定义处理器进行交互式查询;还有一个全球性的 StateStore 您可以访问整个主题,而不是kafkastreams仅示例分配的分区(请参阅addglobalstore和globaltable)
问题3:我猜您没有(手动)向拓扑注册状态存储;请参阅topology.addstatestore(storebuilder<?>storebuilder,java.lang.string。。。processornames)并连接处理器和状态存储
问题4:是的,状态存储是从changelog主题加载的(使用优化时可能是原始主题)

相关问题