未触发使用者类侦听器方法以接收来自主题的消息Kafka与 Spring 启动应用程序

falq053o  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(344)

我在用Kafka和Spring Boot。我使用rest控制器来调用生产者/消费者api。producer类能够向主题添加消息。我使用命令行实用程序(console consumer.sh)进行了验证。但是,我的consumer类无法在java中接收它们以进行进一步处理@消费者类侦听器方法中使用的kafkalistener应该能够在生产者类向未发生的主题发布消息时接收消息。谢谢你的帮助。
当我已经创建了kafkalistenercontainerfactory,负责在消息发布到主题时调用consumer listener方法时,使用者是否仍然需要订阅和轮询记录?
消费者类别

@Component
public class KafkaListenersExample {

    private final List<KafkaPayload> messages = new ArrayList<>();

    @KafkaListener(topics = "test_topic", containerFactory = "kafkaListenerContainerFactory")
    public void listener(KafkaPayload data) {
        synchronized (messages){
            messages.add(data);
        }
        //System.out.println("message from kafka :"+data);
    }

    public List<KafkaPayload> getMessages(){
        return messages;
    }
}

消费者配置

@Configuration
class KafkaConsumerConfig {

    private String bootstrapServers = "localhost:9092";

    @Bean
    public ConsumerFactory<String, KafkaPayload> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props) ;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerConfigs());
        return factory;
    }
}
mf98qq94

mf98qq941#

侦听器容器创建使用者、订阅并负责轮询。
打开调试日志应有助于确定问题所在。
如果记录已经在主题中,则需要设置 ConsumerConfig.AUTO_OFFSET_RESET_CONFIGearliest . 否则,消费者将从主题的末尾开始消费( latest ).

相关问题