我正在使用springboot并尝试编写一个kafkaproducer来在kafka队列中推送消息。我在@configuration class中创建了这些方法。
@Bean
public KafkaTemplate<String, String> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); //bootstrapAddress holds address of kafka server
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
我已经在kafkamessageproducer类中自动连接了这个kafkatemplate bean,它负责处理kafkatemplate的send函数。
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
但是当我试图编译我的代码时,我遇到了这个错误
Field kafkaTemplate in <pathoffile>.KafkaMessageProducer required a bean of type 'org.springframework.kafka.core.KafkaTemplate' that could not be found.
- Bean method 'kafkaTemplate' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.KafkaTemplate; SearchStrategy: all) found bean 'avroKafkaTemplate'
Action:Consider revisiting the conditions above or defining a bean of type 'org.springframework.kafka.core.KafkaTemplate' in your configuration.
另外,如果我试图在我的spring项目中排除kafkaautoconfiguration,我会得到类似“bean不能被加载,因为kafkaautoconfiguration被禁用”这样的错误。你知道我为什么会出现这个bean错误吗?解决方法是什么?
edit:- i 在我的项目使用的jar文件中发现了以下bean
@Bean
@Conditional({EnableQueueCondition.class})
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate(this.producerFactory());
kafkaTemplate.setProducerListener(new ProducerListenerImpl());
return kafkaTemplate;
}
所以,这就是错误的来源,但是我不知道如何告诉spring不要查看这个bean,而使用我定义的bean。我尝试过在bean上使用主注解和限定符注解,但仍然会出现相同的错误。有没有可能我定义的bean没有被创建或者找不到,然后kafkaautoconfiguration正在寻找由avrokafkatemplate bean覆盖的默认bean?这个问题的解决办法是什么?
2条答案
按热度按时间oyjwcjzk1#
从stacktrace,还有一个
KafkaTemplate
豆子-avroKafkaTemplate
. 所以我猜还有另一种配置,复制KafkaTemplate
定义。kmbjn2e32#
默认情况下,如果在pom中添加kafka依赖项,spring引导将提供kafkatemplatebean。
您只需要在application.yml文件中定义属性示例:
启用自动配置
自动连接Kafka模板:
如果你的情况下,自动配置工厂正在寻找
ProducerFactory<String, Strng>
与您的配置不匹配。因此,将producerfactory()重命名为kafkaproducerfactory(),它将解决您的问题。