springkafka与嵌入式kafka的集成测试

pftdvrlh  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(516)

我有一个spring引导应用程序,它的使用者从一个集群中的主题消费,并在不同集群中生成另一个主题。
现在我正试图使用嵌入了spring的kafka编写集成测试用例,但是有一个问题 KafkaTemplate could not be registered. A bean with that name has already been defined in class path resource 消费者类别

@Service
public class KafkaConsumerService {

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
    pro.forEach(kafkaProducerService::produce);

   }

}

生产者阶级

@Service
public class KafkaProducerService {

@Value("${kafka.producer.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) {
    kafkaTemplate.send(topic,"professor",pro);
  }

 }

在我的测试用例中,我想重写 KafkaTemplate 所以当我打电话的时候 kafkaConsumerService.professor 中的方法 Test 它应该产生数据到嵌入式Kafka,我应该验证它。
测试配置

@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {

@Autowired
 KafkaEmbedded kafkaEmbeded;

@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    return kafkaTemplate;
   }

 }

测试等级

@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Test
public void testReceive() throws Exception {
     kafkaConsumerService.professor(Arrays.asList(new Professor()));

     //How to check messages is sent to kafka?
}

 }

错误

The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered. 
 A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
 Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

还有人能帮我验证发送到嵌入式Kafka服务器的消息吗?
注意,我有一些不赞成的警告
Kafkamebedded类型已弃用
不推荐使用kafkaembedded类型的getpartitionspertopic()方法
不推荐使用kafkatestutils类型的方法producerprops(kafkamebedded)

ha5z0ras

ha5z0ras1#

boot2.1默认情况下禁用bean重写。
bean重写在默认情况下被禁用,以防止bean被意外重写。如果您依赖于重写,则需要设置 spring.main.allow-bean-definition-overridingtrue .
关于反对意见;有关详细信息,请参阅javadocs @EmbeddedKafka . 它被替换为 EmbeddedKafkaBroker .

相关问题