java—重新使用未提交偏移量的消息

2sbarzqh  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(398)

我有一个定制的kafka使用者,我用它向restapi发送一些请求。根据api的响应,我要么提交偏移量,要么跳过消息而不提交。
最小示例:

while (true) {

    ConsumerRecords<String, Object> records = consumer.poll(200);
    for (ConsumerRecord<String, Object> record : records) {

        // Sending a POST request and retrieving the answer
        // ...

        if (responseCode.startsWith("2")) {
            try { 
               consumer.commitSync();
            } catch(CommitFailedException ex) {
              ex.printStackTrace(); 
            }
        } else {
              // Do Nothing
        }
    }
}

现在,当RESTAPI的响应不是以 2 偏移量未提交,但消息不会重新使用。如何强制使用者重新使用具有未提交偏移量的消息?

rqdpfwrv

rqdpfwrv1#

提交偏移量只是存储消费者的当前偏移量(也称为位置)的一种方法。因此,如果它停止,它(或接管的新使用者示例)可以找到它以前的位置并从那里重新开始消费。
因此,即使您没有提交,一旦您收到记录,消费者的位置也会移动。如果你想重新收集一些记录,你必须改变消费者的当前位置。
对于java客户机,可以使用 seek() .
在您的场景中,您可能需要计算相对于当前位置的新位置。如果是这样,您可以使用 position() .

qeeaahzv

qeeaahzv2#

如果计划使用seek(),请确保数据是幂等的。因为您是有选择地提交偏移量,所以遗漏的记录可能在提交(成功处理)记录之前。如果您执行seek()——将groupid的指针移动到uncommitted offset并开始重播,那么您也将获得成功处理的消息。它也有可能成为一个无限循环。
或者,您可以将未成功记录的元数据保存在内存或数据库中,并从“poll(retention.ms)”开始重播主题,以便重播所有记录,但添加一个过滤器,仅处理那些通过api的元数据与您先前保存的内容匹配的记录。每小时或几个小时进行一次批处理。

相关问题