我正在尝试测试我的Kafka流应用程序。我构建了一个简单的拓扑结构,在这里我从一个输入主题中读取数据,并将相同的数据存储在状态存储中。
我尝试使用topologytestdriver为这个拓扑编写单元测试。当我运行测试时,遇到以下错误。
org.apache.kafka.streams.errors.LockException: stream-thread [main] task [0_0] Failed to lock the state directory for task 0_0
at org.apache.kafka.streams.processor.internals.AbstractTask.registerStateStores(AbstractTask.java:197)
at org.apache.kafka.streams.processor.internals.StreamTask.initializeStateStores(StreamTask.java:275)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:403)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:257)
at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:228)
at streams.checkStreams.checkStreamsTest.setup(checkStreamsTest.java:99)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
....
我可以看到在/tmp/kafka streams中本地创建了状态存储,但streams线程无法在其上获得锁。我搜索并发现这个错误可能是因为两个流线程正在尝试访问它,一个有锁,所以另一个必须等待。但是我没有看到在我的代码中创建两个线程流。我对Kafka流和它的测试是新的,我在这里错过了什么吗?
暂无答案!
目前还没有任何答案,快来回答吧!