Kafka消费者投票行为

xggvc2p6  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(381)

我面临着一些严重的问题,试图实现一个解决方案,我的需要,关于Kafka消费(>=0.9)。
假设我有一个函数,它只能读取Kafka主题中的n条消息。
例如: getMsgs(5) -->获取主题中的下5条Kafka消息。
所以,我有一个像这样的循环。用实际正确的参数编辑。在这种情况下,消费者的最大 .poll.records param设置为1,因此实际循环只迭代一次。不同的消费者(其中一些人迭代了许多消息)共享了一个抽象的父亲(这一个),这就是为什么它是这样编码的。这个 numMss 这部分是专门为这个消费者准备的。

for (boolean exit= false;!exit;)
{
   Records = consumer.poll(config.pollTime);
   for (Record r:records) 
   {
       processRecord(r); //do my things
       numMss++;
       if (numMss==maximum) //maximum=5
       {   
          exit=true;
          break;
       }
   }
}

考虑到这一点,问题是poll()方法可能会获得5条以上的消息。例如,如果它得到10条消息,我的代码将永远忘记其他5条消息,因为Kafka会认为它们已经被消耗了。
我试着提供补偿,但似乎不起作用:

consumer.commitSync(Collections.singletonMap(partition,
    new OffsetAndMetadata(record.offset() + 1)));

即使使用offset配置,每当我再次启动consumer时,它也不会从第6条消息开始(记住,我只想要5条消息),而是从第11条消息开始(因为第一次轮询消耗了10条消息)。
有什么解决办法吗,或者(最肯定的)我遗漏了什么?
提前谢谢!!

mwg9r5ms

mwg9r5ms1#

已通过将enable.auto.commit设置为false禁用自动提交。如果要手动提交偏移量,则需要禁用该选项。如果没有下一次调用poll(),则会自动提交从上一次poll()收到的消息的最新偏移量。

kuuvgm7e

kuuvgm7e2#

将auto.offset.reset属性设置为“最新”。然后尝试消费,您将从提交的偏移量中获得消费的记录。
或者在轮询之前使用consumer.seek(topicpartition,offset)api。

krugob8w

krugob8w3#

从kafka 0.9开始,auto.offset.reset参数名称已更改;
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:

earliest: automatically reset the offset to the earliest offset

latest: automatically reset the offset to the latest offset

none: throw exception to the consumer if no previous offset is found for the consumer's group

anything else: throw exception to the consumer.
kx5bkwkv

kx5bkwkv4#

你可以设置 max.poll.records 无论你喜欢多少,每次投票最多只能得到那么多记录。
对于您在这个问题中陈述的用例,您不必自己显式地提交偏移量。你只需要设置 enable.auto.committrue 并设置 auto.offset.resetearliest 这样当没有消费者时,它就会启动 group.id (也就是说,当您第一次开始从分区读取数据时)。一旦您在kafka中存储了group.id和一些使用者偏移量,并且在kafka使用者进程死亡的情况下,它将从上次提交的偏移量开始继续,因为这是默认行为,因为使用者启动时将首先查找是否有任何提交的偏移量,如果有,将从上次提交的偏移量开始继续,并且 auto.offset.reset 我不会插手的。

相关问题