我正在尝试使用拓扑测试驱动程序,它需要在其构造函数中包含拓扑。但是,虽然应用程序本身工作正常,但它在我的kstream单元测试中失败,并出现以下错误:
"StateStore ... is already added"
下面是我要测试的kstream(简称):
@Bean
public KStream<...,...> kstream(StreamsBuilder builder) {
builder.addStateStore(...);
KStream<...,...> stream = builder.stream().filter(...).etc()
stream.process(()->...);
return stream;
}
我的测试(缩短)
def "..."() {
given:
...
StreamsBuilder builder = new StreamsBuilder();
MyStreamService myStreamService = new MyStreamService(...stubbed);
KStream mykStream = myStreamService.kStream(builder);
TopologyTestDriver driver = new TopologyTestDriver(builder.build(), ...)
...
一旦我运行builder.build()来获取拓扑,它就会抛出上面的错误——但是我不明白为什么,因为我只在那个地方调用了addstatestore一次。我删除了除.addstore()方法之外的所有流逻辑,以查看是否有其他方法(map、filter、process等)将其初始化为无效。
我知道有其他的方法来测试Kafka流,但我特别试图让它工作上面解释的方式。如果这不可能,那没关系。
1条答案
按热度按时间v64noz0r1#
在application.yml文件中,尝试添加以下行:spring:kafka:streams:state-dir:“dir path”kstreams将状态存储使用“dir path”目录。