我有一个定制的kafka使用者,我用它向restapi发送一些请求。根据api的响应,我要么提交偏移量,要么跳过消息而不提交。
最小示例:
while (true) {
ConsumerRecords<String, Object> records = consumer.poll(200);
for (ConsumerRecord<String, Object> record : records) {
// Sending a POST request and retrieving the answer
// ...
if (responseCode.startsWith("2")) {
try {
consumer.commitSync();
} catch(CommitFailedException ex) {
ex.printStackTrace();
}
} else {
// Do Nothing
}
}
}
现在,当RESTAPI的响应不是以 2
偏移量未提交,但消息不会重新使用。如何强制使用者重新使用具有未提交偏移量的消息?
2条答案
按热度按时间rqdpfwrv1#
提交偏移量只是存储消费者的当前偏移量(也称为位置)的一种方法。因此,如果它停止,它(或接管的新使用者示例)可以找到它以前的位置并从那里重新开始消费。
因此,即使您没有提交,一旦您收到记录,消费者的位置也会移动。如果你想重新收集一些记录,你必须改变消费者的当前位置。
对于java客户机,可以使用
seek()
.在您的场景中,您可能需要计算相对于当前位置的新位置。如果是这样,您可以使用
position()
.qeeaahzv2#
如果计划使用seek(),请确保数据是幂等的。因为您是有选择地提交偏移量,所以遗漏的记录可能在提交(成功处理)记录之前。如果您执行seek()——将groupid的指针移动到uncommitted offset并开始重播,那么您也将获得成功处理的消息。它也有可能成为一个无限循环。
或者,您可以将未成功记录的元数据保存在内存或数据库中,并从“poll(retention.ms)”开始重播主题,以便重播所有记录,但添加一个过滤器,仅处理那些通过api的元数据与您先前保存的内容匹配的记录。每小时或几个小时进行一次批处理。