kafka与java:如何重读数据

i7uq4tfw  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(395)

我对kafkaapi有以下问题。我通过以下方式设置我的消费者:

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, configuration.batchSize);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

那么

while(true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(200));
    try {
         //do some update in DB in a transaction
         consumer.commitSync();
    } catch (Exception e) {
    }

我想从Kafka那里读取数据,用这些数据更新数据库。但如果更新失败,我想重试,直到它工作。所以我想将db事务应用到kafka,i.a.如果我的db事务正常,那么移动kafka指针,但是如果失败,那么从相同的位置重试。
在我的密码里,

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(

它的意思是“如果Kafka崩溃了,那就从承诺的位置重新开始”。但是当我的db事务失败时,即使我没有commitasync(),指针也会向前移动。
我的问题是:有没有一种简单的方法可以把Kafka指针的位置反转到最后一次投票的位置。
我已经注意到,在api中

public void seek(TopicPartition partition,
             long offset);

但是这需要手动维护一个带有偏移量的分区列表,我想还有更简单更优雅的吗?

ecfdbz9o

ecfdbz9o1#

1) 因为consumer.poll在一个循环中,所以不管您是否提交了偏移量,您都将继续向前移动偏移量。只有在重新启动组件时,提交才有用。即了解消费者应该从哪里开始消费的位置。
2) 如果在db事务失败时需要移动到先前提交的偏移量,那么使用kafka consumer中的seek方法。public void seek(主题分区,长偏移)
3) 为了提交各个分区的偏移量,您需要像前面提到的那样维护每个分区的偏移量。我想没有别的办法了。
每次db事务失败时,您可能不需要寻找以前提交的偏移量。您可能希望暂停消费者并重试几次,以指数方式增加等待时间。
但是要回答您关于如何在每次轮询时移动到上一个偏移量的问题,请跟踪每个分区中第一条消息的偏移量,如果失败,请在循环结束时,查找您跟踪的偏移量。

相关问题