热释光;博士;我试图理解分配了多个分区的单个使用者如何处理reach分区的消费记录。
例如:
在移动到下一个分区之前完全处理单个分区。
每次处理来自每个分区的可用记录块。
从第一个可用分区处理一批n条记录
以循环循环的方式处理分区中的一批n条记录
我找到了 partition.assignment.strategy
的配置 Ranged
或者 RoundRobin
但是这只决定了消费者是如何分配分区的,而不是如何从分配给它的分区中消费。
我开始挖掘kafkaconsumer源代码,#poll()引导我找到#pollforfetches()#pollforfetches(),然后引导我找到fetcher#fetchedrecords()和fetcher#sendfetches()
这只会让我试着跟随整个fetcher类一起,也许只是太晚了,也许我只是没有深入研究,但我很难弄清楚一个消费者将如何处理多个分配的分区。
背景
在Kafka流支持的数据管道上工作。
在这个管道中的几个阶段,当记录由不同的kafka流应用程序处理时,流被连接到外部数据源提供的压缩主题提要中,这些外部数据源提供了所需的数据,这些数据将在继续处理的下一个阶段之前在记录中增加。
一路上有几个死信主题,其中的记录无法与外部数据源匹配,而外部数据源可能会增加记录。这可能是因为数据还不可用(事件或活动还没有直播),或者数据不好,永远不会匹配。
我们的目标是在发布新的增强数据时重新发布死信主题中的记录,这样我们就可以匹配死信主题中以前不匹配的记录,以便更新它们并将它们发送到下游进行附加处理。
记录在多次尝试中可能无法匹配,并且死信主题中可能有多个副本,因此我们只希望重新处理现有记录(在应用程序启动时的最新偏移量之前)以及自应用程序上次运行以来(在上次保存的偏移量之后)发送到死信主题的记录消费群体补偿)。
它工作得很好,因为我的使用者过滤掉了应用程序启动后到达的所有记录,并且我的生产者通过将偏移提交为发布事务的一部分来管理我的使用者组偏移。
但我想确保最终会使用所有分区,因为我遇到了一个奇怪的边缘情况,即重新处理未连接的记录,并与以前一样在死信主题中降落到同一分区中,结果被使用者过滤掉。尽管它没有得到新的记录批来处理,但也有一些分区还没有被重新处理。
任何有助于理解单个使用者如何处理多个分配的分区的帮助都将不胜感激。
1条答案
按热度按时间dba5bblo1#
你在正确的轨道上看
Fetcher
因为大部分逻辑都在那里。首先正如消费者javadoc提到的:
如果一个使用者被分配了多个分区从中获取数据,它将尝试同时从所有分区中获取数据,从而有效地赋予这些分区相同的优先级。
你可以想象,在实践中,有一些事情需要考虑。
每次使用者尝试获取新记录时,它都会排除已经有记录等待的分区(从以前的获取中)。已经有正在进行的获取请求的分区也被排除在外。
获取记录时,使用者指定
fetch.max.bytes
以及max.partition.fetch.bytes
在获取请求中。代理程序使用这些参数来分别确定每个分区和总共返回多少数据。这同样适用于所有分区。默认情况下,使用这两种方法,使用者尝试公平地使用所有分区。如果不是这样,改变
fetch.max.bytes
或者max.partition.fetch.bytes
通常有帮助。如果您想将某些分区的优先级设置为高于其他分区,则需要使用
pause()
以及resume()
手动控制消耗流程。