我已经编写了一个基本的spring引导服务,它通过restapi使用一些数据并将其发布到rabbitmq和kafka。
为了测试处理kafka的服务类,我遵循以下指南:https://www.baeldung.com/spring-boot-kafka-testing
单独来说,测试(kafkamessagingserviceimpltest)在intellij idea和通过命令行上的mvn都能完美地工作。在idea中运行所有项目测试都可以。但是,当我在命令行上通过maven运行所有项目测试时,当尝试对有效负载字符串进行Assert时,这个测试会失败,并出现npe。
我已经将根问题的位置缩小到另一个测试类(apppropertiestest),它只测试我的appproperties组件(这是一个我用来以整洁的方式从application.properties中提取config的组件)。当且仅当该测试类中的测试在项目根目录中使用“mvn clean install”与失败的测试一起运行时,npe才会出现。注解掉这个类中的测试或者用@dirtiescontext注解它可以解决这个问题。显然,这个测试类加载到spring上下文中的某些内容导致了另一个测试中事件/倒计时锁存器的时间/顺序问题。当然,我不想使用@dirtiescontext,因为随着项目复杂性的增加,它会导致生成速度慢得多。它也不能解释问题。。我不能接受:)
apppropertiestest使用构造函数注入来注入appproperties组件。它还扩展了一个抽象类“genericservicetest”,其注解如下:
@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
不包含任何其他内容。您可能知道,springboottest注解构建了一个测试spring上下文,并连接到样板中,以允许对spring应用程序的依赖项注入等进行有效测试,而testconstructor注解允许在我的一些测试中进行构造函数注入。fwiw,我尝试过删除testconstructor注解,并在appproperties类中使用普通的旧自动连线,以查看它是否有影响,但实际上没有。
失败的测试类还扩展了genericservicetest,因为它需要spring上下文注入一些依赖项,比如消费者和正在测试的消息传递服务以及其中的appproperties示例等。
所以我知道问题在哪里,但我不知道问题是什么。即使npe测试失败,我也可以在日志中看到,根据baeldung指南,使用者在失败之前已经成功地使用了消息:
TestKafkaConsumer : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1618997289238, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'
然而,当我们回到Assert时,有效负载是空的。我在失败的测试中尝试了thread.sleep()之类的方法来给它更多的时间,我增加了await()超时时间,但是没有乐趣。
我觉得奇怪的是,这些测试在思想上和孤立上都很好。现在它开始让我有点疯狂,我无法调试它,因为这个问题没有出现在我的ide中。
如果有人有任何想法,将不胜感激!
谢谢。
编辑:有人非常合理地建议我添加一些代码,所以这里是:)
失败的测试(在asserttrue(payload.contains(testmessage))失败,因为payload为null)。autowired kafkamessagingservice只需注入appproperties和kakfatemplate的依赖项,并调用kafkatemplate.send():
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KafkaMessagingServiceImplTest extends GenericServiceTest {
@Autowired
@Qualifier("kafkaMessagingServiceImpl")
private IMessagingService messagingService;
@Autowired
private TestKafkaConsumer kafkaConsumer;
@Value("${app.topicName}")
private String testTopic;
@Test
public void testSendAndConsumeKafkaMessage() throws InterruptedException {
String testMessage = "This is a test message to be sent to Kafka.";
messagingService.sendMessage(testMessage);
kafkaConsumer.getLatch().await(2000, TimeUnit.MILLISECONDS);
String payload = kafkaConsumer.getPayload();
assertTrue(payload.contains(testMessage));
}
testconsumer(用于在上面的测试中使用)
@Component
public class TestKafkaConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);
private CountDownLatch latch = new CountDownLatch(1);
private String payload = null;
@KafkaListener(topics = "${app.topicName}")
public void receive(ConsumerRecord<?, ?> consumerRecord) {
LOGGER.info("received payload='{}'", consumerRecord.toString());
setPayload(consumerRecord.toString());
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
public String getPayload() {
return payload;
}
public void setPayload(String payload) {
this.payload = payload;
}
项目相关性:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.10.19</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>2.5.6.RELEASE</version>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
AppPropertieTest类(该类的上下文似乎导致了问题)
class AppPropertiesTest extends GenericServiceTest {
private final AppProperties appProperties;
public AppPropertiesTest(AppProperties appProperties) {
this.appProperties = appProperties;
}
@Test
public void testAppPropertiesGetQueueName() {
String expected = "test-queue";
String result = appProperties.getRabbitMQQueueName();
assertEquals(expected, result);
}
@Test
public void testAppPropertiesGetDurableQueue() {
boolean isDurableQueue = appProperties.isDurableQueue();
assertTrue(isDurableQueue);
}
}
apppropertiestest类正在测试的appproperties类:
@Component
@ConfigurationProperties("app")
public class AppProperties {
// a whole bunch of properties by name that are prefixed by app. in the application.properties file. Nothing else
}
两个测试都扩展的通用服务测试类。
@SpringBootTest
@TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
public abstract class GenericServiceTest {
}
故障(您可以在上面的行中看到有效负载已被接收并打印出来)。
2021-04-21 14:15:07.113 INFO 493384 --- [ntainer#0-0-C-1] service.TestKafkaConsumer : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1619010907076, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'
[ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.791 s <<< FAILURE! - in
service.KafkaMessagingServiceImplTest
[ERROR] testSendAndConsumeKafkaMessage Time elapsed: 2.044 s <<< ERROR!
java.lang.NullPointerException
at service.KafkaMessagingServiceImplTest.testSendAndConsumeKafkaMessage(KafkaMessagingServiceImplTest.java:42)
1条答案
按热度按时间vyu0f0g11#
问题是
TestListener
是一个@Component
所以它被添加了两次-记录将被添加到另一个示例。我添加了更多的调试来验证getter是否在不同的示例上调用。
如果你不想用
@DirtiesContext
,您至少可以在测试完成后停止侦听器容器: