我从日志中看到,完全相同的消息被665次使用。为什么会这样?
我在日志里也看到了
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies
that the poll loop is spending too much time message processing. You can address this either by increasing the session
timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
消费者财产
group.id=someGroupId
bootstrap.servers=kafka:9092
enable.auto.commit=false
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
session.timeout.ms=30000
max.poll.records=20
ps:是否可以只消耗特定数量的消息,比如队列中1000条消息中的10条、50条或100条?我正在查看'fetch.max.bytes'配置,但它似乎是针对消息大小而不是消息数量。
谢谢
2条答案
按热度按时间o7jaxewo1#
答案在于对以下概念的理解:
session.timeout.ms
心跳
最大轮询间隔毫秒
在您的情况下,您的消费者通过poll()接收消息,但无法在max.poll.interval.ms时间内完成处理。因此,假定由代理挂起并重新平衡分区,因此该使用者将失去所有分区的所有权。它被标记为死亡,不再是一个消费群体的一部分。
然后,当消费者完成处理并再次调用poll()时,会发生两件事:
提交失败,因为使用者不再拥有分区。
broker标识使用者再次启动,因此触发重新平衡,使用者再次加入使用者组,开始拥有分区,并请求来自broker的消息。由于先前的消息未标记为已提交(请参阅上面的#1,失败的提交)并且正在挂起处理,因此代理再次将相同的消息传递给使用者。
消费者再次需要花费大量时间来处理,并且由于无法在小于max.poll.interval.ms的1秒内完成处理。和2。保持循环重复。
要解决此问题,可以根据用户处理所需的时间,将max.poll.interval.ms增加到足够大的值。这样,您的消费者将不会被标记为死亡,也不会收到重复的消息。但是,真正的解决方法是检查处理逻辑并尝试减少处理时间。
sy5wg1nm2#
您粘贴的消息中描述了修复:
您可以通过增加会话超时或使用max.poll.records减少poll()中返回的最大批大小来解决此问题。
原因是在您的使用者能够处理和提交消息之前达到了超时。当您的kafka消费者“提交”时,它基本上是确认收到上一条消息,推进偏移量,从而转到下一条消息。但是如果超过了超时时间(对您来说就是这样),那么消费者的提交就没有效果,因为它发生得太晚了;下一次消费者要求提供信息时,得到的是相同的信息
您可以选择:
增加
session.timeout.ms=30000
,因此消费者有更多的时间处理消息减少
max.poll.records=20
因此,在超时发生之前,使用者需要处理的消息较少。但这并不真正适用于你,因为你的消费者已经只处理一条消息了或者打开
enable.auto.commit
,这可能也不是您的最佳解决方案,因为它可能会导致丢弃邮件,如下所述:如果我们像前面的示例中那样允许偏移量自动提交,则消息将在使用者发出消息之后被视为已被消耗,并且在将消息读入内存缓冲区但在将其插入数据库之前,我们的进程可能会失败。资料来源:https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html