我在用Kafka和Spring Boot。我使用rest控制器来调用生产者/消费者api。producer类能够向主题添加消息。我使用命令行实用程序(console consumer.sh)进行了验证。但是,我的consumer类无法在java中接收它们以进行进一步处理@消费者类侦听器方法中使用的kafkalistener应该能够在生产者类向未发生的主题发布消息时接收消息。谢谢你的帮助。
当我已经创建了kafkalistenercontainerfactory,负责在消息发布到主题时调用consumer listener方法时,使用者是否仍然需要订阅和轮询记录?
消费者类别
@Component
public class KafkaListenersExample {
private final List<KafkaPayload> messages = new ArrayList<>();
@KafkaListener(topics = "test_topic", containerFactory = "kafkaListenerContainerFactory")
public void listener(KafkaPayload data) {
synchronized (messages){
messages.add(data);
}
//System.out.println("message from kafka :"+data);
}
public List<KafkaPayload> getMessages(){
return messages;
}
}
消费者配置
@Configuration
class KafkaConsumerConfig {
private String bootstrapServers = "localhost:9092";
@Bean
public ConsumerFactory<String, KafkaPayload> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props) ;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerConfigs());
return factory;
}
}
1条答案
按热度按时间mf98qq941#
侦听器容器创建使用者、订阅并负责轮询。
打开调试日志应有助于确定问题所在。
如果记录已经在主题中,则需要设置
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
至earliest
. 否则,消费者将从主题的末尾开始消费(latest
).