spring 如何在Sping Boot 项目中配置Kafka主题

r1wp621o  于 2024-01-05  发布在  Spring
关注(0)|答案(1)|浏览(161)

在可流动和Kafka启用的情况下,应用程序运行,但当我配置Kafka bean ConcurrentKafkaListenerContainerFactory时。它生成一个错误:找不到<'org.springframework.Kafka.core.KafkaOperations'。请考虑在配置中定义类型为'org.springframework.Kafka.core.KafkaOperations'的bean。>>
同样的配置,我在另一个没有可流动的项目中做了它。在那里,它运行没有任何问题。

  1. @Bean
  2. public ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand>
  3. createRecurrenceCommandListenerContainerFactory(KafkaCreateRecurrenceErrorHandler errorHandler) {
  4. ConcurrentKafkaListenerContainerFactory<String, CreateRecurrenceCommand> factory =
  5. new ConcurrentKafkaListenerContainerFactory<>();
  6. factory.setCommonErrorHandler(errorHandler);
  7. factory.setConsumerFactory(createRecurrenceCommandConsumerFactory());
  8. factory.setReplyTemplate(kafkaTemplate());
  9. factory.setRecordFilterStrategy(consumerRecord -> {
  10. String messageType = new String(consumerRecord.headers().lastHeader(KafkaCustomHeaders.MESSAGE_TYPE).value());
  11. return !ConsumedMessageType.CREATE_RECURRENCE.toString().equals(messageType);
  12. });
  13. return factory;
  14. }
  15. public ConsumerFactory<String, CreateRecurrenceCommand> createRecurrenceCommandConsumerFactory() {
  16. return generateConsumerFactory(CreateRecurrenceCommand.class, AppConstants.CREATE_RECURRENCE);
  17. }
  18. @Bean
  19. public KafkaTemplate<String, Object> kafkaTemplate() {
  20. return new KafkaTemplate<>(producerFactory());
  21. }

字符串
private ConsumerFactory<String,T> generateConsumerFactory(Class clazz,String groupId){ Map<String,Object> props = new HashMap<>(defaultConsumerFactory. getExcitationProperties());

  1. props.put(
  2. ConsumerConfig.GROUP_ID_CONFIG,
  3. groupId);
  4. ErrorHandlingDeserializer<T> errorHandlingDeserializer = new ErrorHandlingDeserializer<>((topic, data) -> {
  5. try {
  6. if (data == null) {
  7. return null;
  8. }
  9. return objectMapper.readValue(new String(data), clazz);
  10. } catch (JsonProcessingException e) {
  11. throw new DeserializationException("Failed to deserialise", data, false, e);
  12. }
  13. });
  14. return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), errorHandlingDeserializer);
  15. }


在这段代码中,我们期望过滤特定主题的消息:CREATE_RECURRENCE

rwqw0loc

rwqw0loc1#

我不确定什么是“可流动”,但看起来你试图自己配置一切,而不是依赖于Sping Boot 自动配置。为此,请考虑将ConcurrentKafkaListenerContainerFactory的名称更改为kafkaListenerContainerFactory

相关问题