我使用embeddedkafka实现了一系列集成测试,以测试使用springkafka框架运行的kafka流应用程序。
流应用程序从一个kafka主题中读取一条消息,它将其存储到一个内部状态存储中,进行一些转换,并将其发送到另一个微服务中,成为一个请求的主题。当响应返回到响应的主题中时,它从状态存储中检索原始消息,并根据某些业务逻辑将其转发到我们的一个下游系统,每个系统都有自己的主题。
集成测试只是执行业务条件的各种排列。
最初,测试被分成几个类。运行构建时,来自一个类的测试与另一个类中的测试发生冲突,出现一些冲突异常。我没有花太多时间在这个问题上,只是把所有的测试移到了同一个类中。这解决了我的问题,所有测试要么通过gradle build要么通过intelij edi。
以下是测试:
package au.nab.tlm.streams.integration;
import au.nab.tlm.streams.serde.EntitlementsCheckSerDes;
import au.nab.tlm.streams.test.support.MockEntitlementsCheckSerDes;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
@SpringBootTest
@ContextConfiguration(classes = {MyTopologiesIntegrationTest.TestKafkaConfig.class})
@DirtiesContext(classMode = DirtiesContext.ClassMode.BEFORE_EACH_TEST_METHOD)
@EmbeddedKafka(
ports = 9092,
partitions = 1,
topics = {
"topic-1.v1",
"topic-2.v1",
"topic-3.v1",
"topic-4.v1",
"topic-5.v1",
"topic-6.v1",
},
brokerProperties = {"transaction.state.log.replication.factor=1", "transaction.state.log.min.isr=1", "log.dir=/tmp/embedded-kafka"}
)
public class MyTopologiesIntegrationTest {
@Autowired
EmbeddedKafkaBroker kafkaBroker;
@Autowired
EntitlementsCheckSerDes appSerDes;
@Test
public void test_1() {
}
@Test
public void test_2() {
}
@Test
public void test_3() {
}
@Test
public void test_4() {
}
@Test
public void test_5() {
}
@TestConfiguration
public static class TestKafkaConfig {
@Bean
EntitlementsCheckSerDes appSerDes() {
return new MockEntitlementsCheckSerDes();
}
}
}
对结果很满意,我推动了我的更改,只是注意到构建在我们的ci服务器上失败了。再次运行它,这次失败的次数与第一次不同。我让一位同事看了看,他也有类似于ci服务器的故障经历。我在我的机器上运行了至少20次构建,它总是通过。对我的同事一个接一个地进行测试也总是通过的。
我们得到的最常见的例外是主题xyz已经存在,但是偶尔会有一些其他的例外,表明集群不可能是喜欢的或类似的。所有这些异常都向我们表明,尽管使用了 DirtiesContext
运行的第一个测试总是通过的。
我们俩都花了一整天的时间脱掉头发,没法让它工作。我们尝试了谷歌带我们去的任何地方,但一点运气都没有。最后,我们在测试类中只保留了一个测试场景(交互次数最多的场景),并禁用了其余的场景。
显然,这不是一个可以被接受的永久解决办法,我真的很想了解我们做错了什么,以及如何修复它。
提前感谢您的投入。
1条答案
按热度按时间goucqfw61#
不要使用固定端口
ports = 9092,
-默认情况下,嵌入式Kafka将侦听操作系统选择的随机端口。你应该把它用于你的测试用例。
您可以通过调用
this.kafkaBroker.getBrokersAsString()
.