我正在编写一个消费者应用程序,从kafka流中选择记录并使用springkafka进行处理。我的处理步骤如下:
Getting records from stream --> dump it into a table --> Fetch records and call API --> API will update records into a table --> calling Async Commit()
似乎在某些情况下,api处理需要更多的时间,因为要获取更多的记录,我们得到以下错误?
会员consumer-prov-em-1-399ede46-9e12-4388-b5b8-f198a4e6a5bc向协调员apslt2555.uhc发送休假组请求。com:9095 (id:2147483577机架:空)由于消费者轮询超时已过期。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在处理消息方面花费了太多时间。您可以通过增加max.poll.interval.ms或使用max.poll.records减少poll()中返回的批的最大大小来解决此问题。
org.apache.kafka.clients.consumer.commitfailedexception:无法完成提交,因为组已重新平衡分区并将其分配给其他成员。这意味着对poll()的后续调用之间的时间间隔长于配置的max.poll.interval.ms,这通常意味着poll循环在消息处理上花费了太多时间。您可以通过增加max.poll.interval.ms或使用max.poll.records减少poll()中返回的批的最大大小来解决此问题。
我知道这可以通过减少max.poll.records或增加max.poll.interval.ms来处理。如果我将max.poll.records设置为10,那么什么是poll()行为呢?是否要从流中取出10条记录等待提交这些记录,然后再取出下10条记录?下一次轮询何时发生?它是否也会影响性能,因为我们正在将max.poll.records从默认值500减少到10。
是否还必须增加max.poll.interval.ms。大概10分钟吧。在更改这些值时,是否有任何向下影响需要注意?除了这些参数,还有其他方法来处理这些错误吗?
1条答案
按热度按时间db2dz4w81#
max.poll.records
允许在将记录刷新到另一个系统之前在内存中收集记录的批处理消耗模型。这样做的目的是通过将来自kafka的轮询一起获得所有记录,然后在poll循环中在内存中处理这些记录。如果你减少这个数字,那么消费者将更频繁地从Kafka投票。这意味着它需要更频繁地进行网络通话。这可能会降低kafka流处理的性能。
max.poll.interval.ms
控制在使用者主动离开组之前轮询调用之间的最长时间。如果这个数字增加,那么Kafka需要更长的时间来检测消费者的故障。另一方面,如果这个值太低,Kafka可能会错误地发现许多活着的消费者失败,从而更经常地重新平衡。