我有一个java应用程序,具有以下属性
kafkaProperties = new Properties();
kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokersList);
kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupName);
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerSessionTimeoutMs);
kafkaProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
我已经创建了15个消费线程,并让它们处理下面的可运行线程。我没有任何其他具有此消费组名称的消费线程。
@Override
public void run() {
try {
logger.info("Starting ConsumerWorker, consumerId={}", consumerId);
consumer.subscribe(Arrays.asList(kafkaTopic), offsetLoggingCallback);
while (true) {
boolean isPollFirstRecord = true;
logger.debug("consumerId={}; about to call consumer.poll() ...", consumerId);
ConsumerRecords<String, String> records = consumer.poll(pollIntervalMs);
Map<Integer,Long> partitionOffsetMap = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
if (isPollFirstRecord) {
isPollFirstRecord = false;
logger.info("Start offset for partition {} in this poll : {}", record.partition(), record.offset());
}
messageProcessor.processMessage(record.value(), record.offset());
partitionOffsetMap.put(record.partition(),record.offset());
}
if (!records.isEmpty()) {
logger.info("Invoking commit for partition/offset : {}", partitionOffsetMap);
consumer.commitAsync(offsetLoggingCallback);
}
}
} catch (WakeupException e) {
logger.warn("ConsumerWorker [consumerId={}] got WakeupException - exiting ... Exception: {}",
consumerId, e.getMessage());
} catch (Exception e) {
logger.error("ConsumerWorker [consumerId={}] got Exception - exiting ... Exception: {}",
consumerId, e.getMessage());
} finally {
logger.warn("ConsumerWorker [consumerId={}] is shutting down ...", consumerId);
consumer.close();
}
}
我还有一个offsetcommitcallbackimpl,如下所示。它基本上将分区及其提交的偏移量作为Map来维护,并在提交偏移量时记录。
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
offsets.forEach((topicPartition, offsetAndMetadata) -> {
partitionOffsetMap.put(topicPartition, offsetAndMetadata);
logger.info("Offset position during the commit for consumerId : {}, partition : {}, offset : {}", Thread.currentThread().getName(), topicPartition.partition(), offsetAndMetadata.offset());
});
} else {
offsets.forEach((topicPartition, offsetAndMetadata) -> logger.error("Offset commit error, and partition offset info : {}, partition : {}, offset : {}", exception.getMessage(), topicPartition.partition(), offsetAndMetadata.offset()));
}
}
问题/问题:我注意到每当我(重新启动)关闭并恢复应用程序时,我都会错过事件/消息。所以当我仔细观察伐木的时候。通过比较关机前提交的偏移量(使用offsetcommitcallback日志记录)和重启后提取处理的偏移量,我发现对于某些分区,我们没有提取关机前留下的偏移量。有时某些分区的起始偏移量比提交的偏移量多1000。
注意:40个分区中有8个是这样的
如果仔细观察logginrun方法,就会发现有一条log语句,在调用async commit之前,我实际上打印了偏移量。例如,如果关闭前的最后一个日志显示分区1为10。重新启动后,我们正在为分区1处理的第一个偏移量将是100。我确认我们丢失了90条信息。
有谁能想到为什么会发生这种情况?
暂无答案!
目前还没有任何答案,快来回答吧!