我有一个应用程序使用Kafka,并利用两个独立的消费者组监听一个主题,其中一个消费者组(C1)总是监听消息,另一个消费者组(C2)在线并开始监听消息,然后再次离线一段时间。
更具体地说,始终侦听使用者组C1的代码通过创建一个虚拟机来响应消息,该虚拟机开始侦听C2并使用昂贵的硬件执行一些工作。
我遇到的问题是,在虚拟机启动并开始监听消费者组C2之后,它 * 有时 * 什么也收不到,尽管它应该接收到C1接收到的导致C2首先被监听的相同消息。
我将使用以下主题:生产者和使用者配置:
主题配置:
partitions: 6
compression.type: producer
leader.replication.throttled.replicas: --
message.downconversion.enable: true
min.insync.replicas: 2
segment.jitter.ms: 0
cleanup.policy: delete
flush.ms: 9223372036854775807
follower.replication.throttled.replicas: --
segment.bytes: 104857600
retention.ms: 604800000
flush.messages: 9223372036854775807
message.format.version: 3.0-IV1
max.compaction.lag.ms: 9223372036854775807
file.delete.delay.ms: 60000
max.message.bytes: 8388608
min.compaction.lag.ms: 0
message.timestamp.type: CreateTime
preallocate: false
min.cleanable.dirty.ratio: 0.5
index.interval.bytes: 4096
unclean.leader.election.enable: false
retention.bytes: -1
delete.retention.ms: 86400000
segment.ms: 604800000
message.timestamp.difference.max.ms: 9223372036854775807
segment.index.bytes: 10485760
生产商配置:
("message.max.bytes", "20971520")
("queue.buffering.max.ms", "0")
消费者配置:
("enable.partition.eof", "false")
("session.timeout.ms", "6000")
("enable.auto.commit", "true")
("auto.commit.interval.ms", "5000")
("enable.auto.of.store", "true")
这个bug是间歇性的,有时候会出现,有时候不会,并且在消费者启动并监听C2后重新发送完全相同的消息总是成功,所以这不是消息大小对于主题来说太大之类的问题。
我怀疑这与偏移量提交/存储不当有关。我的主题配置使用默认值“latest”作为“auto.offset.reset”,所以我怀疑偏移量被丢弃或者不知何故没有被正确地提交,因此触发C2监听的新消息被错过,因为它不是Kafka的“最新”消息。侦听使用者组C2的代码所做的工作运行时间相当长,使用者经常报告超时,所以这可能是原因之一?
编辑:我得到的超时错误正是:
WARN - librdkafka - librdkafka: MAXPOLL [thrd:main]: Application maximum poll interval (300000ms) exceeded by 424ms (adjust max.poll.interval.ms for long-running message processing): l
eaving group
我将Rust rdkafka库用于生产者和消费者,并使用confluent cloud的托管kafka。
1条答案
按热度按时间hgqdbh6s1#
对于“auto.offset.reset”,我使用默认值“latest”,所以我怀疑偏移量被丢弃了,或者没有正确提交
这与提交的值无关,只是在您开始阅读唯一的组ID时。
你已经启用了自动提交,但是你得到了错误,因此偏移量得到了提交,但是你没有成功地处理数据,这就是为什么会有跳过。
你的错误,
第一个月
在没有看到消费者代码的情况下,你需要在你的poll函数中做“稍微”少一些的事情,例如,删除一行日志可以很容易地减少半秒,假设日志语句需要1ms,并且你每次池化500条记录。
否则,增大
max.poll.interval.ms
(允许使用者心跳等待更长时间)或减小max.poll.records
(每个心跳处理更少的数据,但轮询更频繁)是对此错误的正确响应。