@kafkalistener不接收记录

xwbd5t1u  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(730)

无法确定,我的Kafka侦听器配置有什么问题。最初,我有一个名为“transactions”的非空kafka主题,其中有几个记录(我可以在kafkatool中看到)。这是application.yml:

  1. spring:
  2. ###
  3. # Kafka Settings
  4. ###
  5. kafka:
  6. consumer:
  7. bootstrap-servers: localhost:9092
  8. key-deserializer: com.panbet.externalbet.history.report.support.ReportsBetKeyJsonDeserializer
  9. value-deserializer: com.panbet.externalbet.history.report.support.ReportsBetJsonDeserializer
  10. group-id: external.history.group

以下是java配置文件:

  1. @EnableKafka
  2. @Configuration
  3. public class KafkaConfig
  4. {
  5. private final KafkaProperties properties;
  6. public KafkaConfig(KafkaProperties properties)
  7. {
  8. this.properties = properties;
  9. }
  10. @Bean
  11. public ConsumerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaConsumerFactory()
  12. {
  13. return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties());
  14. }
  15. @Bean
  16. public ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> kafkaListenerContainerFactory()
  17. {
  18. ConcurrentKafkaListenerContainerFactory<ReportsBetKeyDto, ReportsBetDto> factory = new ConcurrentKafkaListenerContainerFactory<>();
  19. factory.setConsumerFactory(kafkaConsumerFactory());
  20. return factory;
  21. }
  22. }

我的Kafka监听器看起来像(前面提到的主题“事务”的监听器):

  1. @Component
  2. public class ReportsConsumer
  3. {
  4. @KafkaListener(topics = { "transactions" })
  5. public void listen(ConsumerRecord<ReportsBetKeyDto, ReportsBetDto> record)
  6. {
  7. System.out.println(record);
  8. }
  9. }

我期望:当应用程序启动时,我会在reportsconsumer.listen方法中捕获debudder。但毫无收获的是,什么也没发生。听不到Kafka的主题。有什么问题吗?谢谢您。

eagi6jfj

eagi6jfj1#

默认情况下,新消费者从主题末尾开始消费。
spring.kafka.consumer.auto-offset-reset=earliest 我也建议 spring.kafka.consumer.enable-auto-commit=false 以便容器管理偏移量。

相关问题