spring kafka test-不接收带有embeddedkafka的@kafkalistener中的数据

0kjbasz6  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(486)

我们正在使用cucumber对out应用程序进行一些集成测试,在测试时遇到了一些问题 @KafkaListener . 我们设法使用了一个嵌入式Kafka,并在其中生成数据。
但消费者从未收到任何数据,我们也不知道发生了什么。
这是我们的代码:
生产者配置

  1. @Configuration
  2. @Profile("test")
  3. public class KafkaTestProducerConfig {
  4. private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
  5. @Autowired
  6. protected EmbeddedKafkaBroker embeddedKafka;
  7. @Bean
  8. public Map<String, Object> producerConfig() {
  9. Map<String, Object> props = new HashMap<>();
  10. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
  11. embeddedKafka.getBrokersAsString());
  12. props.put(SCHEMA_REGISTRY_URL, "URL");
  13. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  14. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
  15. return props;
  16. }
  17. @Bean
  18. public ProducerFactory<String, GenericRecord> producerFactory() {
  19. return new DefaultKafkaProducerFactory<>(producerConfig());
  20. }
  21. @Bean
  22. public KafkaTemplate<String, GenericRecord> kafkaTemplate() {
  23. return new KafkaTemplate<>(producerFactory());
  24. }
  25. }

消费者配置

  1. @Configuration
  2. @Profile("test")
  3. @EnableKafka
  4. public class KafkaTestConsumerConfig {
  5. @Autowired
  6. protected EmbeddedKafkaBroker embeddedKafka;
  7. private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
  8. @Bean
  9. public Map<String, Object> consumerProperties() {
  10. Map<String, Object> props = new HashMap<>();
  11. props.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId");
  12. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
  13. props.put(SCHEMA_REGISTRY_URL, "URL");
  14. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
  15. props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
  16. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
  17. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName());
  18. props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
  19. props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 10000);
  20. return props;
  21. }
  22. @Bean
  23. public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {
  24. KafkaAvroDeserializer avroDeserializer = new KafkaAvroDeserializer();
  25. avroDeserializer.configure(consumerProperties(), false);
  26. return new DefaultKafkaConsumerFactory<>(consumerProperties(), new StringDeserializer(), avroDeserializer);
  27. }
  28. @Bean
  29. public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory() {
  30. ConcurrentKafkaListenerContainerFactory<String, Object> factory =
  31. new ConcurrentKafkaListenerContainerFactory<>();
  32. factory.setConsumerFactory(consumerFactory());
  33. factory.setBatchListener(true);
  34. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  35. return factory;
  36. }
  37. }

集成测试

  1. @SpringBootTest(
  2. webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
  3. classes = Application.class)
  4. @ActiveProfiles("test")
  5. @EmbeddedKafka(topics = {"TOPIC1", "TOPIC2", "TOPIC3"})
  6. public class CommonStepDefinitions implements En {
  7. protected static final Logger LOGGER = LoggerFactory.getLogger(CommonStepDefinitions.class);
  8. @Autowired
  9. protected KafkaTemplate<String, GenericRecord> kafkaTemplate;
  10. }

步骤定义

  1. public class KafkaStepDefinitions extends CommonStepDefinitions {
  2. private static final String TEMPLATE_TOPIC = "TOPIC1";
  3. public KafkaStepDefinitions(){
  4. Given("given statement", () -> {
  5. OperationEntity operationEntity = new OperationEntity();
  6. operationEntity.setFoo("foo");
  7. kafkaTemplate.send(TEMPLATE_TOPIC, AvroPojoTransformer.pojoToRecord(operationEntity));
  8. });
  9. }
  10. }

消费者同样的代码在生产引导服务器上运行良好,但是嵌入的kafka却无法实现

  1. @KafkaListener(topics = "${kafka.topic1}", groupId = "groupId")
  2. public void consume(List<GenericRecord> records, Acknowledgment ack) throws DDCException {
  3. LOGGER.info("Batch of {} records received", records.size());
  4. //do something with the data
  5. ack.acknowledge();
  6. }

日志里的一切看起来都很好,但我们不知道遗漏了什么。
提前谢谢。

wkftcu5l

wkftcu5l1#

问题是消费者没有连接到嵌入式Kafka。您可以通过使用 test 配置文件并将以下内容添加到 application-test.yml .

  1. spring:
  2. kafka:
  3. bootstrap-servers: ${spring.embedded.kafka.brokers}

那你也不需要这个习惯 consumerProperties , consumerFactory 以及 kafkaListenerContainerFactory 豆。Spring Boot会为你自动接线。如果你真的想用这些豆子(不知道为什么),你应该仔细检查 KafkaAutoConfiguration 以确保覆盖正确的名称和类型。

tnkciper

tnkciper2#

你的考试在开始前就结束了;参见包含0-0-c-1的线程名称;耗电元件启动后不到一秒钟即停止。
我刚刚检查了,没有,我的测试正在执行,因为您可以在日志的第1174行中看到producerconfig值的日志。这个日志就出现在kafkatemplate.send(主题,实体)之后。我不使用 @Test 因为在 cucumber 里你有不同的定义。你可以在我的帖子里看到代码。
好 啊;但是您需要在测试中使用某种闩锁来等待使用者实际分配主题/分区并接收数据。按照现在构建测试的方式,测试在使用者完全启动之前就被关闭了。请参阅我对这个问题的回答,了解一种 Package 侦听器的方法,这样您就可以等到收到记录(这使用普通的junit测试)。
另一种技术是以某种方式将服务注入到侦听器bean中,以倒计时闩锁。
作为快速测试添加 Thread.sleep(10_000) 到你的“台阶”。
但是,据推测,您可能希望以某种方式Assert消费者实际上获得了数据。您需要在测试退出之前进行Assert,因为它是异步的,所以您需要某种机制来等待它发生(或超时)。

相关问题