我们有一个生产者-消费者环境,我们的项目使用springboot。Kafka的配置是通过使用类来完成的
@Configuration
@EnableKafka
public class DefaultKafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.bootstrap-servers-group}")
private String bootstrapServersGroup;
@Bean
public ConsumerFactory<String,String> consumerDefaultFactory(){
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, bootstrapServersGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerDefaultContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerDefaultFactory());
return factory;
}
}
场景:我们正在写一些关于Kafka主题的价值观。假设我们有一个主题,我们正在放置实时数据。他们的地位像live:0“对于已完成的事件和”live:1“现场直播。现在,当事件将被激活时,它将得到更新并在主题上写入,并且根据这个主题我们正在处理事件。
问题:当事件启动时,我从主题中读取数据live:1“并已处理。但当事件更新和新数据更新时。在这里,当新的数据更新的主题,我能够阅读这些数据。但是有了关于这个主题的新数据,我也收到了旧数据。因为在我的事件受到影响的同时,我得到了新旧数据。有时它会上线,有时会完成。
有人对此有什么建议吗?为什么要获取提交的数据和新更新的数据?我在配置上有什么遗漏吗?
2条答案
按热度按时间6pp0gazn1#
您可能需要检查以下几点:-1。分区数2。消费者数量
这是否也意味着您正在以新的状态重新将消费消息写入主题?
ctrmrzij2#
这就是我们所拥有的。