多线程kafka使用者或perpartition perconsumer

wixjitnu  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(393)

在实现Kafka消费模式时,最好的方法是什么。
目标是从Kafka读取并写回数据库。数百万行
方法1:每个分区-每个使用者-等待消息消费(即写回db),然后继续轮询循环中的下一个。
方法2:每个分区-每个使用者-将记录发送到工作线程或线程池以写回db,稍后提交偏移量并继续轮询。需要注意抵销管理。在这种情况下,不要等待消息写回数据库。继续轮询,将消息传递给工作线程。
对他们俩都有什么见解吗?
谢谢

8iwquhpp

8iwquhpp1#

方法1:仅当您可以估计消息处理时间时,该方法才适用,否则不建议这样做。
问题:在这种方法中,主要的问题是保持使用者处于活动状态,如果您要等待消息在再次调用poll()之前得到完全处理,则必须确保使用者在调用poll()之前处于活动状态,因为kafka维护一个名为“session.timeout.ms”的属性。kafka代理/集群对该属性的值执行操作,如果使用者在“session.timeout.ms”时间段内无法再次调用poll(),代理将标记使用者已死亡,并将其踢出。现在,当使用者将完成消息处理并再次调用poll()时,它将被视为一个新的joiner,并将再次像以前一样给出从偏移开始的记录集。记住这个场景,消费者将被困在一个无限循环中,在这个循环中,它将永远不会进行补偿。
可能的解决方案1:要使用此方法,您需要以下属性“session.timeout.ms”的良好值,并具有以下副作用:
1:值太低:消费者将如上所述被标记为死亡,并且永远不会继续其偏移,但是消息将被处理,但每次完成消息时,它将再次获得以前的消息+新消息。
2:价值太高:经纪人在发现消费者真正失败时会很晚,这将导致记录重复,并影响整体吞吐量。
可能的解决方案2:(仅对版本0.10.1.x有效)由kafka在发行版(0.10.1.0)中正式修复。在这种方法中,引入了两个值得注意的实体:一个新属性“max.poll.interval.ms”,用于设置客户端调用poll()之间的最大延迟,另一个后台线程负责保持使用者的活动状态。因此,在一个场景中,当使用者调用一个poll()方法,然后忙于处理消息时,内部后台线程将使心跳保持活跃,因此使用者将保持活跃。但是,在属性“max.poll.interval.ms”的超时值保持有效之前,此内部后台线程本身将保持活动状态。因此,此线程将等待使用者在时间段值“max.poll.interval.ms”内调用poll(),否则,它将发送一个休假请求,并将自行终止
同样,这个解决方案中棘手的部分是找到一个合适的属性值:“max.poll.interval.ms”(非常重要,这个时间是后台线程保持心跳活动而不需要显式调用poll()的时间)。
方法2:使用工作线程是一个好主意,但是您必须为接收到的消息维护一个内部队列或验证,这可能很复杂,而且您还需要对自动提交使用手动提交。有关提交的更多信息,请参阅此搜索标题“提交和偏移”。
问题:在这种方法中,主要问题是跟踪接收到的消息和成功处理的消息。因此,您的使用者将收到消息,它将消息传递给相应的工作线程,并提交偏移量并向前移动以接收更多消息。在此过程中,您必须注意以下问题:
如果消息被接收并提交了偏移量,但是之后由于任何原因工作线程未能处理该消息,那么现在如何再次获取该消息呢?
如果消费者接收到消息,但没有空闲的工作线程来处理该怎么办?
解决方案:可以有不同的方法来解决上述问题,其中一种方法是使用内部队列来保存消息和手动提交,这些消息和手动提交仅在工作线程报告消息处理成功时才会发送。但是,需要非常仔细的实现,因为它可能导致复杂的代码,还可能导致内存管理或线程问题。
建议:根据您的需求,您可以使用一种或另一种方法来解决上述可能出现的问题。不过,我建议一个更健壮的解决方案是使用分区暂停/恢复。以非常抽象的方式,您的消费者应该执行以下步骤:
1:轮询()消息。
2:暂停所有相应的主题/分区。
3:将消息分配给工作线程并等待其处理。
4:继续调用poll(),但当分区暂停时,将不会收到额外的消息,而使用者将保持活动状态(确保在此期间没有注册新主题)
5:如果所有工作线程都应报告消息处理成功/失败,则相应地提交偏移量。
6:恢复所有分区。
注意:根据您的场景和需求,可能有更好的方法或其他解决方案。这只是一个想法或一个可能的解决方案。

相关问题