npe测试kafka生产者

ubbxdtey  于 2021-07-09  发布在  Java
关注(0)|答案(1)|浏览(477)

我已经编写了一个基本的spring引导服务,它通过restapi使用一些数据并将其发布到rabbitmq和kafka。
为了测试处理kafka的服务类,我遵循以下指南:https://www.baeldung.com/spring-boot-kafka-testing
单独来说,测试(kafkamessagingserviceimpltest)在intellij idea和通过命令行上的mvn都能完美地工作。在idea中运行所有项目测试都可以。但是,当我在命令行上通过maven运行所有项目测试时,当尝试对有效负载字符串进行Assert时,这个测试会失败,并出现npe。
我已经将根问题的位置缩小到另一个测试类(apppropertiestest),它只测试我的appproperties组件(这是一个我用来以整洁的方式从application.properties中提取config的组件)。当且仅当该测试类中的测试在项目根目录中使用“mvn clean install”与失败的测试一起运行时,npe才会出现。注解掉这个类中的测试或者用@dirtiescontext注解它可以解决这个问题。显然,这个测试类加载到spring上下文中的某些内容导致了另一个测试中事件/倒计时锁存器的时间/顺序问题。当然,我不想使用@dirtiescontext,因为随着项目复杂性的增加,它会导致生成速度慢得多。它也不能解释问题。。我不能接受:)
apppropertiestest使用构造函数注入来注入appproperties组件。它还扩展了一个抽象类“genericservicetest”,其注解如下:

  1. @SpringBootTest
  2. @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)

不包含任何其他内容。您可能知道,springboottest注解构建了一个测试spring上下文,并连接到样板中,以允许对spring应用程序的依赖项注入等进行有效测试,而testconstructor注解允许在我的一些测试中进行构造函数注入。fwiw,我尝试过删除testconstructor注解,并在appproperties类中使用普通的旧自动连线,以查看它是否有影响,但实际上没有。
失败的测试类还扩展了genericservicetest,因为它需要spring上下文注入一些依赖项,比如消费者和正在测试的消息传递服务以及其中的appproperties示例等。
所以我知道问题在哪里,但我不知道问题是什么。即使npe测试失败,我也可以在日志中看到,根据baeldung指南,使用者在失败之前已经成功地使用了消息:

  1. TestKafkaConsumer : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1618997289238, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'

然而,当我们回到Assert时,有效负载是空的。我在失败的测试中尝试了thread.sleep()之类的方法来给它更多的时间,我增加了await()超时时间,但是没有乐趣。
我觉得奇怪的是,这些测试在思想上和孤立上都很好。现在它开始让我有点疯狂,我无法调试它,因为这个问题没有出现在我的ide中。
如果有人有任何想法,将不胜感激!
谢谢。
编辑:有人非常合理地建议我添加一些代码,所以这里是:)
失败的测试(在asserttrue(payload.contains(testmessage))失败,因为payload为null)。autowired kafkamessagingservice只需注入appproperties和kakfatemplate的依赖项,并调用kafkatemplate.send():

  1. @EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
  2. class KafkaMessagingServiceImplTest extends GenericServiceTest {
  3. @Autowired
  4. @Qualifier("kafkaMessagingServiceImpl")
  5. private IMessagingService messagingService;
  6. @Autowired
  7. private TestKafkaConsumer kafkaConsumer;
  8. @Value("${app.topicName}")
  9. private String testTopic;
  10. @Test
  11. public void testSendAndConsumeKafkaMessage() throws InterruptedException {
  12. String testMessage = "This is a test message to be sent to Kafka.";
  13. messagingService.sendMessage(testMessage);
  14. kafkaConsumer.getLatch().await(2000, TimeUnit.MILLISECONDS);
  15. String payload = kafkaConsumer.getPayload();
  16. assertTrue(payload.contains(testMessage));
  17. }

testconsumer(用于在上面的测试中使用)

  1. @Component
  2. public class TestKafkaConsumer {
  3. private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);
  4. private CountDownLatch latch = new CountDownLatch(1);
  5. private String payload = null;
  6. @KafkaListener(topics = "${app.topicName}")
  7. public void receive(ConsumerRecord<?, ?> consumerRecord) {
  8. LOGGER.info("received payload='{}'", consumerRecord.toString());
  9. setPayload(consumerRecord.toString());
  10. latch.countDown();
  11. }
  12. public CountDownLatch getLatch() {
  13. return latch;
  14. }
  15. public String getPayload() {
  16. return payload;
  17. }
  18. public void setPayload(String payload) {
  19. this.payload = payload;
  20. }

项目相关性:

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-amqp</artifactId>
  9. <version>2.2.2.RELEASE</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.springframework.kafka</groupId>
  13. <artifactId>spring-kafka</artifactId>
  14. <version>2.5.8.RELEASE</version>
  15. </dependency>
  16. <dependency>
  17. <groupId>org.springframework.boot</groupId>
  18. <artifactId>spring-boot-starter-test</artifactId>
  19. <scope>test</scope>
  20. </dependency>
  21. <dependency>
  22. <groupId>org.springframework.boot</groupId>
  23. <artifactId>spring-boot-starter-actuator</artifactId>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.springframework.boot</groupId>
  27. <artifactId>spring-boot-configuration-processor</artifactId>
  28. <optional>true</optional>
  29. </dependency>
  30. <!-- https://mvnrepository.com/artifact/org.mockito/mockito-all -->
  31. <dependency>
  32. <groupId>org.mockito</groupId>
  33. <artifactId>mockito-all</artifactId>
  34. <version>1.10.19</version>
  35. <scope>test</scope>
  36. </dependency>
  37. <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka-test -->
  38. <dependency>
  39. <groupId>org.springframework.kafka</groupId>
  40. <artifactId>spring-kafka-test</artifactId>
  41. <version>2.5.6.RELEASE</version>
  42. <scope>test</scope>
  43. </dependency>
  44. </dependencies>
  45. <dependencyManagement>
  46. <dependencies>
  47. <dependency>
  48. <groupId>org.springframework.cloud</groupId>
  49. <artifactId>spring-cloud-dependencies</artifactId>
  50. <version>${spring-cloud.version}</version>
  51. <type>pom</type>
  52. <scope>import</scope>
  53. </dependency>
  54. </dependencies>
  55. </dependencyManagement>
  56. <build>
  57. <plugins>
  58. <plugin>
  59. <groupId>org.springframework.boot</groupId>
  60. <artifactId>spring-boot-maven-plugin</artifactId>
  61. </plugin>
  62. </plugins>
  63. </build>

AppPropertieTest类(该类的上下文似乎导致了问题)

  1. class AppPropertiesTest extends GenericServiceTest {
  2. private final AppProperties appProperties;
  3. public AppPropertiesTest(AppProperties appProperties) {
  4. this.appProperties = appProperties;
  5. }
  6. @Test
  7. public void testAppPropertiesGetQueueName() {
  8. String expected = "test-queue";
  9. String result = appProperties.getRabbitMQQueueName();
  10. assertEquals(expected, result);
  11. }
  12. @Test
  13. public void testAppPropertiesGetDurableQueue() {
  14. boolean isDurableQueue = appProperties.isDurableQueue();
  15. assertTrue(isDurableQueue);
  16. }
  17. }

apppropertiestest类正在测试的appproperties类:

  1. @Component
  2. @ConfigurationProperties("app")
  3. public class AppProperties {
  4. // a whole bunch of properties by name that are prefixed by app. in the application.properties file. Nothing else
  5. }

两个测试都扩展的通用服务测试类。

  1. @SpringBootTest
  2. @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
  3. public abstract class GenericServiceTest {
  4. }

故障(您可以在上面的行中看到有效负载已被接收并打印出来)。

  1. 2021-04-21 14:15:07.113 INFO 493384 --- [ntainer#0-0-C-1] service.TestKafkaConsumer : received payload='ConsumerRecord(topic = test-kafka-topic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1619010907076, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = This is a test message to be sent to Kafka.)'
  2. [ERROR] Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 3.791 s <<< FAILURE! - in
  3. service.KafkaMessagingServiceImplTest
  4. [ERROR] testSendAndConsumeKafkaMessage Time elapsed: 2.044 s <<< ERROR!
  5. java.lang.NullPointerException
  6. at service.KafkaMessagingServiceImplTest.testSendAndConsumeKafkaMessage(KafkaMessagingServiceImplTest.java:42)
vyu0f0g1

vyu0f0g11#

问题是 TestListener 是一个 @Component 所以它被添加了两次-记录将被添加到另一个示例。
我添加了更多的调试来验证getter是否在不同的示例上调用。

  1. @Component
  2. public class TestKafkaConsumer {
  3. private static final Logger LOGGER = LoggerFactory.getLogger(TestKafkaConsumer.class);
  4. private final CountDownLatch latch = new CountDownLatch(1);
  5. private String payload = null;
  6. @KafkaListener(id = "myListener", topics = "${app.kafkaTopicName}")
  7. public void receive(ConsumerRecord<?, ?> consumerRecord) {
  8. LOGGER.info("received payload='{}'", consumerRecord.toString());
  9. setPayload(consumerRecord.toString());
  10. if (payload != null) {
  11. LOGGER.info(this + ": payload is not null still");
  12. }
  13. latch.countDown();
  14. if (payload != null) {
  15. LOGGER.info(this + ": payload is not null after latch countdown");
  16. }
  17. }
  18. public CountDownLatch getLatch() {
  19. return latch;
  20. }
  21. public String getPayload() {
  22. LOGGER.info(this + ": getting Payload");
  23. return payload;
  24. }
  25. public void setPayload(String payload) {
  26. this.payload = payload;
  27. }
  28. }

如果你不想用 @DirtiesContext ,您至少可以在测试完成后停止侦听器容器:

  1. @SpringBootTest
  2. @TestConstructor(autowireMode = TestConstructor.AutowireMode.ALL)
  3. public abstract class GenericDataServiceTest {
  4. @AfterAll
  5. static void stopContainers(@Autowired KafkaListenerEndpointRegistry registry) {
  6. registry.stop();
  7. }
  8. }
  1. [INFO] ------------------------------------------------------------------------
  2. [INFO] BUILD SUCCESS
  3. [INFO] ------------------------------------------------------------------------
展开查看全部

相关问题