我面临着一些严重的问题,试图实现一个解决方案,我的需要,关于Kafka消费(>=0.9)。
假设我有一个函数,它只能读取Kafka主题中的n条消息。
例如: getMsgs(5)
-->获取主题中的下5条Kafka消息。
所以,我有一个像这样的循环。用实际正确的参数编辑。在这种情况下,消费者的最大 .poll.records
param设置为1,因此实际循环只迭代一次。不同的消费者(其中一些人迭代了许多消息)共享了一个抽象的父亲(这一个),这就是为什么它是这样编码的。这个 numMss
这部分是专门为这个消费者准备的。
for (boolean exit= false;!exit;)
{
Records = consumer.poll(config.pollTime);
for (Record r:records)
{
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
{
exit=true;
break;
}
}
}
考虑到这一点,问题是poll()方法可能会获得5条以上的消息。例如,如果它得到10条消息,我的代码将永远忘记其他5条消息,因为Kafka会认为它们已经被消耗了。
我试着提供补偿,但似乎不起作用:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
即使使用offset配置,每当我再次启动consumer时,它也不会从第6条消息开始(记住,我只想要5条消息),而是从第11条消息开始(因为第一次轮询消耗了10条消息)。
有什么解决办法吗,或者(最肯定的)我遗漏了什么?
提前谢谢!!
4条答案
按热度按时间mwg9r5ms1#
已通过将enable.auto.commit设置为false禁用自动提交。如果要手动提交偏移量,则需要禁用该选项。如果没有下一次调用poll(),则会自动提交从上一次poll()收到的消息的最新偏移量。
kuuvgm7e2#
将auto.offset.reset属性设置为“最新”。然后尝试消费,您将从提交的偏移量中获得消费的记录。
或者在轮询之前使用consumer.seek(topicpartition,offset)api。
krugob8w3#
从kafka 0.9开始,auto.offset.reset参数名称已更改;
如果kafka中没有初始偏移量,或者服务器上不再存在当前偏移量(例如,因为该数据已被删除),该怎么办:
kx5bkwkv4#
你可以设置
max.poll.records
无论你喜欢多少,每次投票最多只能得到那么多记录。对于您在这个问题中陈述的用例,您不必自己显式地提交偏移量。你只需要设置
enable.auto.commit
至true
并设置auto.offset.reset
至earliest
这样当没有消费者时,它就会启动group.id
(也就是说,当您第一次开始从分区读取数据时)。一旦您在kafka中存储了group.id和一些使用者偏移量,并且在kafka使用者进程死亡的情况下,它将从上次提交的偏移量开始继续,因为这是默认行为,因为使用者启动时将首先查找是否有任何提交的偏移量,如果有,将从上次提交的偏移量开始继续,并且auto.offset.reset
我不会插手的。