我是kafka新手,在springboot中编写了一个cron作业来验证sql和kafka主题中的一些记录。这项工作需要每天早上进行一次。我已经设置了工作运行后,每15分钟为我的测试和它的预期工作。但是,当我每2小时更新cron以运行作业时,我就从主题中获得了消费者已经读取/处理和复制的记录。我正在使用commitasync手动提交偏移量。例如,我已经发送了3个记录的主题,但消费者得到了超过5k的记录大多是重复的。
以下是消费者代码及其属性。
public Map<String, Object> getKafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "cronKafkaConsumer");
return props;
}
public List<CostKafkaModel> consumeCosts() {
KafkaConsumer<String, CostKafkaModel> consumer = new KafkaConsumer<>(
getKafkaConsumerProps(), new StringDeserializer(),
new JsonDeserializer<>(CostKafkaModel.class));
List<CostKafkaModel> kafkaModelList = new ArrayList<>();
try {
consumer.subscribe(Arrays.asList("deltaCosts"));
ConsumerRecords<String, CostKafkaModel> records = consumer
.poll(1000);
for (ConsumerRecord<String, CostKafkaModel> record : records) {
kafkaModelList.add(record.value());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.commitSync();
consumer.close();
}
return kafkaModelList;
}
任何帮助都将受到感谢。
暂无答案!
目前还没有任何答案,快来回答吧!