ApacheKafka嵌入式kafka junit测试-运行单元测试时启动的应用程序

kkih6yb8  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(607)

我正在使用kafka在spring boot中开发一个异步邮件服务器。
我用嵌入式Kafka编写了测试,它在一个随机端口中启动自己的Kafka主题,并使用它进行测试。
当我启动这个应用程序时,上下文正在加载,它在我的本地应用程序中预期为kafka集群。我需要停止加载应用程序conext。我复制了https://github.com/code-not-found/spring-kafka/blob/master/spring-kafka-unit-test-classrule/src/test/java/com/codenotfound/kafka/producer/springkafkasendertest.java 效果非常好。当我在我的项目中遵循同样的风格时,我可以看到实际的应用程序开始了。
java语言

  1. package com.mailer.embeddedkafkatests;
  2. import static org.junit.Assert.assertTrue;
  3. import java.util.Map;
  4. import java.util.concurrent.BlockingQueue;
  5. import java.util.concurrent.LinkedBlockingQueue;
  6. import java.util.concurrent.TimeUnit;
  7. import org.apache.kafka.clients.consumer.ConsumerConfig;
  8. import org.apache.kafka.clients.consumer.ConsumerRecord;
  9. import org.apache.kafka.common.serialization.StringDeserializer;
  10. import org.junit.After;
  11. import org.junit.Before;
  12. import org.junit.ClassRule;
  13. import org.junit.Test;
  14. import org.junit.runner.RunWith;
  15. import org.slf4j.Logger;
  16. import org.slf4j.LoggerFactory;
  17. import org.springframework.beans.factory.annotation.Autowired;
  18. import org.springframework.boot.test.context.SpringBootTest;
  19. import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
  20. import org.springframework.kafka.listener.ContainerProperties;
  21. import org.springframework.kafka.listener.KafkaMessageListenerContainer;
  22. import org.springframework.kafka.listener.MessageListener;
  23. import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
  24. import org.springframework.kafka.test.utils.ContainerTestUtils;
  25. import org.springframework.kafka.test.utils.KafkaTestUtils;
  26. import org.springframework.test.annotation.DirtiesContext;
  27. import org.springframework.test.context.junit4.SpringRunner;
  28. import com.mailer.model.Mail;
  29. import com.mailer.producer.KafkaMessageProducer;
  30. import com.mailer.serializer.MailSerializer;
  31. @RunWith(SpringRunner.class)
  32. @SpringBootTest
  33. @DirtiesContext
  34. public class SpringKafkaSenderTest {
  35. private static final Logger LOGGER =
  36. LoggerFactory.getLogger(SpringKafkaSenderTest.class);
  37. private static String SENDER_TOPIC = "sender.t";
  38. @Autowired
  39. private KafkaMessageProducer sender;
  40. private KafkaMessageListenerContainer<String, Mail> container;
  41. private BlockingQueue<ConsumerRecord<String, Mail>> records;
  42. @ClassRule
  43. public static EmbeddedKafkaRule embeddedKafka =
  44. new EmbeddedKafkaRule(1, true, SENDER_TOPIC);
  45. @Before
  46. public void setUp() throws Exception {
  47. // set up the Kafka consumer properties
  48. Map<String, Object> consumerProperties =
  49. KafkaTestUtils.consumerProps("sender", "false",
  50. embeddedKafka.getEmbeddedKafka());
  51. consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  52. consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MailSerializer.class);
  53. // create a Kafka consumer factory
  54. DefaultKafkaConsumerFactory<String, Mail> consumerFactory =
  55. new DefaultKafkaConsumerFactory<String, Mail>(
  56. consumerProperties);//, new StringDeserializer(), new JsonDeserializer<>(Mail.class));
  57. // set the topic that needs to be consumed
  58. ContainerProperties containerProperties =
  59. new ContainerProperties(SENDER_TOPIC);
  60. // create a Kafka MessageListenerContainer
  61. container = new KafkaMessageListenerContainer<>(consumerFactory,
  62. containerProperties);
  63. // create a thread safe queue to store the received message
  64. records = new LinkedBlockingQueue<>();
  65. // setup a Kafka message listener
  66. container
  67. .setupMessageListener(new MessageListener<String, Mail>() {
  68. @Override
  69. public void onMessage(
  70. ConsumerRecord<String, Mail> record) {
  71. LOGGER.debug("test-listener received message='{}'",
  72. record.toString());
  73. records.add(record);
  74. }
  75. });
  76. // start the container and underlying message listener
  77. container.start();
  78. // wait until the container has the required number of assigned partitions
  79. ContainerTestUtils.waitForAssignment(container,
  80. embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
  81. }
  82. @After
  83. public void tearDown() {
  84. // stop the container
  85. container.stop();
  86. }
  87. @Test
  88. public void testSend() throws InterruptedException {
  89. // send the message
  90. Mail mail = new Mail();
  91. mail.setFrom("vinoth@local.com");
  92. sender.sendMessage(mail);
  93. Thread.sleep(4000);
  94. // check that the message was received
  95. ConsumerRecord<String, Mail> received =
  96. records.poll(10, TimeUnit.SECONDS);
  97. // Hamcrest Matchers to check the value
  98. assertTrue(received.value().getFrom().equals(mail.getFrom()));
  99. System.out.println(received.value().getFrom());
  100. // assertThat(received, hasValue(mail));
  101. // AssertJ Condition to check the key
  102. // assertThat(received).has(key(null));
  103. }
  104. }
x8diyxa7

x8diyxa71#

为什么要停止加载spring上下文?这个junit的目的不是测试spring应用程序吗?
在任何情况下,只要取下 @SpringBootTest 注解和spring上下文将不会加载。

相关问题