在SpringBootKafka流中添加statestore的位置

bd1hkmkf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(403)

我正在尝试使用拓扑测试驱动程序,它需要在其构造函数中包含拓扑。但是,虽然应用程序本身工作正常,但它在我的kstream单元测试中失败,并出现以下错误:

"StateStore ... is already added"

下面是我要测试的kstream(简称):

@Bean
public KStream<...,...> kstream(StreamsBuilder builder) {
  builder.addStateStore(...);
  KStream<...,...> stream = builder.stream().filter(...).etc()
  stream.process(()->...);
  return stream;
}

我的测试(缩短)

def "..."() {
  given:
    ...
    StreamsBuilder builder = new StreamsBuilder();
    MyStreamService myStreamService = new MyStreamService(...stubbed);
    KStream mykStream = myStreamService.kStream(builder);
    TopologyTestDriver driver = new TopologyTestDriver(builder.build(), ...)
    ...

一旦我运行builder.build()来获取拓扑,它就会抛出上面的错误——但是我不明白为什么,因为我只在那个地方调用了addstatestore一次。我删除了除.addstore()方法之外的所有流逻辑,以查看是否有其他方法(map、filter、process等)将其初始化为无效。
我知道有其他的方法来测试Kafka流,但我特别试图让它工作上面解释的方式。如果这不可能,那没关系。

v64noz0r

v64noz0r1#

在application.yml文件中,尝试添加以下行:spring:kafka:streams:state-dir:“dir path”kstreams将状态存储使用“dir path”目录。

相关问题